[CXF-5855] enable atmosphere's sse handling; update the sample

Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a529d270
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a529d270
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a529d270

Branch: refs/heads/3.1.x-fixes
Commit: a529d270668dabd1cfa6b040769cb25af97973db
Parents: 91197fd
Author: Akitoshi Yoshida <a...@apache.org>
Authored: Thu Mar 17 00:11:58 2016 +0100
Committer: Akitoshi Yoshida <a...@apache.org>
Committed: Tue Apr 5 13:36:57 2016 +0200

----------------------------------------------------------------------
 .../release/samples/jax_rs/websocket/README.txt |  26 ++-
 .../release/samples/jax_rs/websocket/pom.xml    |  10 +-
 .../java/demo/jaxrs/server/CustomerService.java | 105 ++++++++-
 .../websocket/src/test/resources/client.js      | 232 +++++++++++++++++++
 .../websocket/atmosphere/AtmosphereUtils.java   |  18 ++
 .../AtmosphereWebSocketJettyDestination.java    |   3 +-
 .../AtmosphereWebSocketServletDestination.java  |   3 +-
 .../atmosphere/DefaultProtocolInterceptor.java  |  90 ++++++-
 8 files changed, 463 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/websocket/README.txt 
b/distribution/src/main/release/samples/jax_rs/websocket/README.txt
index 09c43e8..f1d3455 100644
--- a/distribution/src/main/release/samples/jax_rs/websocket/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/websocket/README.txt
@@ -6,6 +6,13 @@ This is a websocket transport version of JAX-RS Basic Demo.
 A RESTful customer service is provided on URL ws://localhost:9000/customers. 
 Users access this URI to operate on customer.
 
+This sample includes two convenient clients: a plain javascript browser client
+and a node.js client based on atmosphere.
+
+
+Connecting to the server
+---------------------------------------
+
 Open a websocket to ws://localhost:9000/ and send requests over the websocket.
 
 A GET request to path /customerservice/customers/123
@@ -102,7 +109,6 @@ Please review the README in the samples directory before
 continuing.
 
 
-
 Building and running the demo using maven
 ---------------------------------------
 
@@ -118,6 +124,8 @@ Using either UNIX or Windows:
 
 To remove the target dir, run mvn clean".
 
+Using Javascript client in Browser
+--------
 Using a web browser that natively supports WebSocket (Safari, Chrome, Firefox):
 After starting the server (see above), open the index.html page located at
 
@@ -140,3 +148,19 @@ Content-Type: text/xml; charset="utf-8"
   <name>Jack</name>
 </Customer>
 ------------------------------------------------------------------------
+
+
+Using Node.js client 
+--------
+
+Go to samples/jax_rs/websocket/src/test/resources and at the console
+
+Assuming node (>=v4) and npm are installed, execute the following shell 
commands.
+
+% npm install atmosphere.js
+% node client.js
+
+This client program supports websocket and sse and allows
+you to choose your preferred protocol.
+
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/websocket/pom.xml 
b/distribution/src/main/release/samples/jax_rs/websocket/pom.xml
index abe9d3b..353e544 100644
--- a/distribution/src/main/release/samples/jax_rs/websocket/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/websocket/pom.xml
@@ -32,6 +32,7 @@
         <cxf.version>${project.version}</cxf.version>
         <!-- TODO remove these local entries after making the referenced 
dependency managed in parent/pom.xml -->
         <cxf.ahc.version>1.8.5</cxf.ahc.version>
+        <cxf.atmosphere.version>2.3.7</cxf.atmosphere.version>
         <cxf.jetty8.version>8.1.15.v20140411</cxf.jetty8.version>
         <cxf.jetty9.version>9.2.2.v20140723</cxf.jetty9.version>
         <cxf.jetty.version>${cxf.jetty8.version}</cxf.jetty.version>
@@ -207,6 +208,13 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-core</artifactId>
         </dependency>
-        
+
+        <!-- add atmosphere -->
+        <dependency>
+            <groupId>org.atmosphere</groupId>
+            <artifactId>atmosphere-runtime</artifactId>
+            <version>${cxf.atmosphere.version}</version>
+        </dependency>
+                
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
 
b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
index 967e978..cc039ff 100644
--- 
a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java
@@ -25,6 +25,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletResponse;
 
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -43,13 +46,15 @@ import 
org.apache.cxf.transport.websocket.WebSocketConstants;
 @Path("/customerservice/")
 @Produces("text/xml")
 public class CustomerService {
+    private static final int MAX_ERROR_COUNT = 5;
     private static ExecutorService executor = 
Executors.newSingleThreadExecutor();
 
     long currentId = 123;
     Map<Long, Customer> customers = new HashMap<Long, Customer>();
     Map<Long, Order> orders = new HashMap<Long, Order>();
-    Map<String, OutputStream> monitors = new HashMap<String, OutputStream>();
-    
+    Map<String, WriterHolder<OutputStream>> monitors = new HashMap<String, 
WriterHolder<OutputStream>>();
+    Map<String, WriterHolder<HttpServletResponse>> monitors2 = new 
HashMap<String, WriterHolder<HttpServletResponse>>();
+
     public CustomerService() {
         init();
     }
@@ -60,7 +65,9 @@ public class CustomerService {
         System.out.println("----invoking getCustomer, Customer id is: " + id);
         long idNumber = Long.parseLong(id);
         Customer customer = customers.get(idNumber);
-        sendCustomerEvent("retrieved", customer);
+        if (customer != null) {
+            sendCustomerEvent("retrieved", customer);
+        }
         return customer;
     }
 
@@ -129,35 +136,83 @@ public class CustomerService {
         final String key = reqid == null ? "*" : reqid; 
         return new StreamingOutput() {
             public void write(final OutputStream out) throws IOException, 
WebApplicationException {
-                monitors.put(key, out);
+                monitors.put(key, new WriterHolder(out, MAX_ERROR_COUNT));
                 out.write(("Subscribed at " + new 
java.util.Date()).getBytes());
             }
+            
         };
     }
 
     @GET
+    @Path("/monitor2")
+    @Produces("text/*")
+    public void monitorCustomers2(
+            final @javax.ws.rs.core.Context HttpServletResponse httpResponse,
+            @HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) String 
reqid) {
+        final String key = reqid == null ? "*" : reqid; 
+        monitors2.put(key, new WriterHolder(httpResponse, MAX_ERROR_COUNT));
+        try {
+            httpResponse.getOutputStream().write(("Subscribed at " + new 
java.util.Date()).getBytes());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @GET
     @Path("/unmonitor/{key}")
     @Produces("text/*")
     public String unmonitorCustomers(@PathParam("key") String key) {
         return (monitors.remove(key) != null ? "Removed: " : "Already removed: 
") + key; 
     }
 
+    @GET
+    @Path("/unmonitor2/{key}")
+    @Produces("text/*")
+    public String unmonitorCustomers2(@PathParam("key") String key) {
+        return (monitors2.remove(key) != null ? "Removed: " : "Already 
removed: ") + key; 
+    }
+
     private void sendCustomerEvent(final String msg, final Customer customer) {
         executor.execute(new Runnable() {
             public void run() {
                 try {
                     String t = msg + ": " + customer.getId() + "/" + 
customer.getName();
-                    for (Iterator<OutputStream> it = 
monitors.values().iterator(); it.hasNext();) {
-                        OutputStream out = it.next();
+                    for (Iterator<WriterHolder<OutputStream>> it = 
monitors.values().iterator(); it.hasNext();) {
+                        WriterHolder<OutputStream> wh = it.next();
                         try {
-                            out.write(t.getBytes());
+                            wh.getValue().write(t.getBytes());
+                            wh.getValue().flush();
+                            wh.reset();
                         } catch (IOException e) {
-                            try {
-                                out.close();
-                            } catch (IOException e2) {
-                                // ignore;
+                            System.out.println("----error writing to " + 
wh.getValue() + " " + wh.get());
+                            if (wh.increment()) {
+                                try {
+                                    wh.getValue().close();
+                                } catch (IOException e2) {
+                                    // ignore;
+                                }
+                                it.remove();
+                                System.out.println("----purged " + 
wh.getValue());
+                            }
+                        }
+                    }
+                    for (Iterator<WriterHolder<HttpServletResponse>> it = 
monitors2.values().iterator(); it.hasNext();) {
+                        WriterHolder<HttpServletResponse> wh = it.next();
+                        try {
+                            
wh.getValue().getOutputStream().write(t.getBytes());
+                            wh.getValue().getOutputStream().flush();
+                            wh.reset();
+                        } catch (IOException e) {
+                            System.out.println("----error writing to " + 
wh.getValue() + " " + wh.get());
+                            if (wh.increment()) {
+                                try {
+                                    wh.getValue().getOutputStream().close();
+                                } catch (IOException e2) {
+                                    // ignore;
+                                }
+                                it.remove();
+                                System.out.println("----purged " + 
wh.getValue());
                             }
-                            it.remove();
                         }
                     }
                 } catch (Exception e) {
@@ -183,4 +238,30 @@ public class CustomerService {
         orders.put(o.getId(), o);
     }
 
+    private static class WriterHolder<T> {
+        final private T value;
+        final private int max; 
+        final private AtomicInteger errorCount;
+
+        public WriterHolder(T object, int max) {
+            this.value = object;
+            this.max = max;
+            this.errorCount = new AtomicInteger();
+        }
+
+        public T getValue() {
+            return value;
+        }
+
+        public int get() {
+            return errorCount.get();
+        }
+        public boolean increment() {
+            return max < errorCount.getAndIncrement();
+        }
+
+        public void reset() {
+            errorCount.getAndSet(0);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js
 
b/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js
new file mode 100644
index 0000000..7eb55f3
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/websocket/src/test/resources/client.js
@@ -0,0 +1,232 @@
+/**
+ * client.js
+ * 
+ * A client program to interact with samples/jax_rs/websocket's server.
+ * 
+ * 
+ */
+
+"use strict";
+
+var HOST_URL = 'http://localhost:9100/';
+
+var reader = require('readline');
+var prompt = reader.createInterface(process.stdin, process.stdout);
+
+var atmosphere = require('atmosphere.js');
+
+var request = { url: HOST_URL,
+                transport : 'websocket',
+                trackMessageLength: false,
+                dropHeaders: false,
+                reconnectInterval : 5000};
+var isopen = false;
+
+const TRANSPORT_NAMES = ["websocket", "sse"];
+
+const COMMAND_LIST = 
+    [["add name",       "Add a new consumer and return the customer 
instance."],
+     ["delete id",      "Delete the customer."],
+     ["get id",         "Return the customere."],
+     ["quit",           "Quit the application."],
+     ["subscribe",      "Subscribe to the customer updatese."],
+     ["unsubscribe",    "Unsubscribe from the customer updatese."],
+     ["update id name", "Update the customer."]];
+
+function selectOption(c, opts) {
+    var i = parseInt(c);
+    if (!(i >= 0 && i < opts.length)) {
+        console.log('Invalid selection: ' + c + '; Using ' + opts[0]);
+        i = 0;
+    }
+    return opts[i];
+}
+
+function getArgs(name, msg) {
+    var sp = name.length;
+    if (msg.length > name.length && msg.charAt(name.length) != ' ') {
+        // remove the command suffix
+        sp = msg.indexOf(' ', name.length);
+        if (sp < 0) {
+            sp = msg.length;
+        }
+    }
+    return msg.substring(sp).trim().split(' ');
+}
+
+function createAddCustomerPayload(name) {
+    return "<?xml version=\"1.0\"?>\n<Customer>\n    <name>" + name + 
"</name>\n</Customer>\n";
+}
+
+function createUpdateCustomerPayload(id, name) {
+    return "<?xml version=\"1.0\"?>\n<Customer>\n    <name>" + name + 
"</name>\n    <id>" + id + "</id>\n</Customer>\n";
+}
+
+///
+
+function doHelp() {
+    console.log('Available commands');
+    for (var i = 0; i < COMMAND_LIST.length; i++) { 
+        var c = COMMAND_LIST[i][0];
+        console.log(c + "                    ".substring(0, 20 - c.length) + 
COMMAND_LIST[i][1]);
+    }
+}
+
+function doAdd(v) {
+    var req;
+    if (transport == 'websocket') {
+        req = "POST /customerservice/customers\r\nContent-Type: text/xml; 
charset='utf-8'\r\nAccept: text/xml\r\n\r\n" 
+            + createAddCustomerPayload(v[0]);
+    } else if (transport == 'sse') {
+        req = {"method": "POST", "url": HOST_URL + 
"customerservice/customers", "headers": {"content-type": "text/xml; 
charset=utf-8", "accept": "text/xml"}, "data": createAddCustomerPayload(v[0])}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doDelete(v) {
+    var req;
+    if (transport == 'websocket') {
+        req = "DELETE /customerservice/customers/" + v[0];
+    } else if (transport == 'sse') {
+        req = {"method": "DELETE", "url": HOST_URL + 
"customerservice/customers/" + v[0]}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doGet(v) {
+    var req;
+    if (transport == 'websocket') {
+        req = "GET /customerservice/customers/" + v[0];
+    } else if (transport == 'sse') {
+        req = {"method": "GET", "url": HOST_URL + "customerservice/customers/" 
+ v[0]}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doSubscribe() {
+    var req;
+    if (transport == 'websocket') {
+        req = "GET /customerservice/monitor\r\nAccept: text/plain\r\n";
+    } else if (transport == 'sse') {
+        req = {"method": "GET", "url": HOST_URL + "customerservice/monitor2", 
"headers": {"accept": "text/plain"}}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doUnsubscribe() {
+    var req;
+    if (transport == 'websocket') {
+        req = "GET /customerservice/unmonitor/*\r\nAccept: text/plain\r\n";
+    } else if (transport == 'sse') {
+        req = {"method": "GET", "url": HOST_URL + 
"customerservice/unmonitor2/*", "headers": {"accept": "text/plain"}}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doUpdate(v) {
+    var req;
+    if (transport == 'websocket') {
+        req = "PUT /customerservice/customers\r\nContent-Type: text/xml; 
charset='utf-8'\r\nAccept: text/xml\r\n\r\n" 
+            + createUpdateCustomerPayload(v[0], v[1]);
+    } else if (transport == 'sse') {
+        req = {"method": "PUT", "url": HOST_URL + "customerservice/customers", 
"headers": {"content-type": "text/xml; charset=utf-8", "accept": "text/xml"}, 
"data": createUpdateCustomerPayload(v[0], v[1])}
+    }
+    console.log("TRACE: sending ", req);
+    subSocket.push(req);
+}
+
+function doQuit() {
+    subSocket.close();
+    process.exit(0);
+}
+
+///
+
+request.onOpen = function(response) {
+    isopen = true;
+    console.log('Connected using ' + response.transport);
+    prompt.setPrompt("> ", 2);
+    prompt.prompt();
+};
+
+request.onMessage = function (response) {
+    var message = response.responseBody;
+    console.log('Received: ', message);
+    
console.log('------------------------------------------------------------------------');
+    prompt.setPrompt("> ", 2);
+    prompt.prompt();
+};
+
+request.onReconnect = function(response) {
+    console.log('Reconnecting ...');
+}
+
+request.onReopen = function(response) {
+    isopen = true;
+    console.log('Reconnected using ' + response.transport);
+    prompt.setPrompt("> ", 2);
+    prompt.prompt();
+}
+
+request.onClose = function(response) {
+    isopen = false;
+}
+
+request.onError = function(response) {
+    console.log("Sorry, something went wrong: " + response.responseBody);
+};
+
+var transport = null;
+var subSocket = null;
+var author = null;
+
+console.log("Select transport ...");
+for (var i = 0; i < TRANSPORT_NAMES.length; i++) { 
+    console.log(i + ": " + TRANSPORT_NAMES[i]);
+}
+prompt.setPrompt("select: ", 6);
+prompt.prompt();
+
+prompt.
+on('line', function(line) {
+    var msg = line.trim();
+    if (transport == null) {
+        transport = selectOption(msg, TRANSPORT_NAMES);
+        request.transport = transport;
+        subSocket = atmosphere.subscribe(request);
+        console.log("Connecting using " + transport);
+        setTimeout(function() {
+            if (!isopen) {
+                console.log("Unable to open a connection. Terminated.");
+                process.exit(0);
+            }
+        }, 3000);
+    } else if (msg.length == 0) {
+        doHelp();
+    } else if (msg.indexOf("add") == 0) {
+        doAdd(getArgs("add", msg));
+    } else if (msg.indexOf("del") == 0) {
+        doDelete(getArgs("del", msg));
+    } else if (msg.indexOf("get") == 0) {
+        doGet(getArgs("get", msg));
+    } else if (msg.indexOf("quit") == 0) {
+        doQuit();
+    } else if (msg.indexOf("sub") == 0) {
+        doSubscribe(getArgs("sub", msg));
+    } else if (msg.indexOf("unsub") == 0) {
+        doUnsubscribe(getArgs("unsub", msg));
+    } else if (msg.indexOf("update") == 0) {
+        doUpdate(getArgs("update", msg));
+    }
+    prompt.setPrompt("> ", 2);
+    prompt.prompt();
+}).
+on('close', function() {
+    console.log("close");
+    process.exit(0);
+});

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
index 1a4a9b5..079792c 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereUtils.java
@@ -21,10 +21,18 @@ package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.util.List;
 
+import javax.servlet.http.HttpServletRequest;
+
 import org.apache.cxf.Bus;
 import org.apache.cxf.helpers.CastUtils;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereInterceptor;
+import org.atmosphere.cpr.HeaderConfig;
+import org.atmosphere.interceptor.CacheHeadersInterceptor;
+import org.atmosphere.interceptor.HeartbeatInterceptor;
+import org.atmosphere.interceptor.JavaScriptProtocol;
+import org.atmosphere.interceptor.SSEAtmosphereInterceptor;
+import org.atmosphere.util.Utils;
 
 /**
  * 
@@ -36,6 +44,10 @@ public final class AtmosphereUtils {
 
     public static void addInterceptors(AtmosphereFramework framework, Bus bus) 
{
         Object ais = bus.getProperty("atmosphere.interceptors");
+        // pre-install those atmosphere default interceptors before the custom 
interceptors.
+        framework.interceptor(new CacheHeadersInterceptor()).interceptor(new 
HeartbeatInterceptor())
+        .interceptor(new SSEAtmosphereInterceptor()).interceptor(new 
JavaScriptProtocol());
+
         if (ais == null || ais instanceof AtmosphereInterceptor) {
             framework.interceptor(ais == null 
                 ? new DefaultProtocolInterceptor() : 
(AtmosphereInterceptor)ais);
@@ -43,9 +55,15 @@ public final class AtmosphereUtils {
         }
         if (ais instanceof List<?>) {
             List<AtmosphereInterceptor> icps = CastUtils.cast((List<?>)ais);
+            // add the custom interceptors
             for (AtmosphereInterceptor icp : icps) {
                 framework.interceptor(icp);
             }
         }
     }
+    
+    public static boolean useAtmosphere(HttpServletRequest req) {
+        return Utils.webSocketEnabled(req) 
+            || req.getParameter(HeaderConfig.X_ATMOSPHERE_TRANSPORT) != null; 
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
index 0e02923..d0cc806 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
@@ -45,7 +45,6 @@ import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResponse;
 import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-import org.atmosphere.util.Utils;
 import org.eclipse.jetty.server.Request;
 
 
@@ -119,7 +118,7 @@ public class AtmosphereWebSocketJettyDestination extends 
JettyHTTPDestination im
         @Override
         public void handle(String target, Request baseRequest, 
HttpServletRequest request,
                            HttpServletResponse response) throws IOException, 
ServletException {
-            if (Utils.webSocketEnabled(request)) {
+            if (AtmosphereUtils.useAtmosphere(request)) {
                 try {
                     framework.doCometSupport(AtmosphereRequest.wrap(request), 
                                              
AtmosphereResponse.wrap(response));

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index 6459150..a4e702c 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -41,7 +41,6 @@ import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResource;
 import org.atmosphere.cpr.AtmosphereResponse;
 import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
-import org.atmosphere.util.Utils;
 
 /**
  * 
@@ -69,7 +68,7 @@ public class AtmosphereWebSocketServletDestination extends 
ServletDestination im
     @Override
     public void invoke(ServletConfig config, ServletContext context, 
HttpServletRequest req,
                        HttpServletResponse resp) throws IOException {
-        if (Utils.webSocketEnabled(req)) {
+        if (AtmosphereUtils.useAtmosphere(req)) {
             try {
                 framework.doCometSupport(AtmosphereRequest.wrap(req), 
                                          AtmosphereResponse.wrap(resp));

http://git-wip-us.apache.org/repos/asf/cxf/blob/a529d270/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
index 1a2cd9a..2631d51 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -37,8 +37,8 @@ import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.transport.websocket.InvalidPathException;
 import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketUtils;
-import org.atmosphere.config.service.AtmosphereInterceptorService;
 import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AsyncIOInterceptor;
 import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
 import org.atmosphere.cpr.AsyncIOWriter;
@@ -48,20 +48,24 @@ import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
 import org.atmosphere.cpr.AtmosphereInterceptorWriter;
 import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter;
 import org.atmosphere.cpr.AtmosphereResponse;
 import org.atmosphere.cpr.FrameworkConfig;
 
 /**
  * DefaultProtocolInterceptor provides the default CXF's WebSocket protocol 
that uses.
  * 
+ * This interceptor is automatically engaged when no atmosphere interceptor is 
configured.  
  */
-@AtmosphereInterceptorService
 public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
     private static final Logger LOG = 
LogUtils.getL7dLogger(DefaultProtocolInterceptor.class);
 
     private static final String REQUEST_DISPATCHED = "request.dispatched";
     private static final String RESPONSE_PARENT = "response.parent";
 
+    private Map<String, AtmosphereResponse> suspendedResponses = new 
HashMap<String, AtmosphereResponse>();
+
     private final AsyncIOInterceptor interceptor = new Interceptor();
 
     private Pattern includedheaders;
@@ -102,10 +106,77 @@ public class DefaultProtocolInterceptor extends 
AtmosphereInterceptorAdapter {
         this.excludedheaders = excludedheaders;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public Action inspect(final AtmosphereResource r) {
         LOG.log(Level.FINE, "inspect");
+        if (AtmosphereResource.TRANSPORT.WEBSOCKET != r.transport() 
+            && AtmosphereResource.TRANSPORT.SSE != r.transport()
+            && AtmosphereResource.TRANSPORT.POLLING != r.transport()) {
+            LOG.fine("Skipping ignorable request");
+            return Action.CONTINUE;
+        }
+        if (AtmosphereResource.TRANSPORT.POLLING == r.transport()) {
+            final String saruuid = (String)r.getRequest()
+                
.getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
+            final AtmosphereResponse suspendedResponse = 
suspendedResponses.get(saruuid);
+            LOG.fine("Attaching a proxy writer to suspended response");
+            r.getResponse().asyncIOWriter(new AtmosphereInterceptorWriter() {
+                @Override
+                public AsyncIOWriter write(AtmosphereResponse r, String data) 
throws IOException {
+                    suspendedResponse.write(data);
+                    suspendedResponse.flushBuffer();
+                    return this;
+                }
+
+                @Override
+                public AsyncIOWriter write(AtmosphereResponse r, byte[] data) 
throws IOException {
+                    suspendedResponse.write(data);
+                    suspendedResponse.flushBuffer();
+                    return this;
+                }
+
+                @Override
+                public AsyncIOWriter write(AtmosphereResponse r, byte[] data, 
int offset, int length) 
+                    throws IOException {
+                    suspendedResponse.write(data, offset, length);
+                    suspendedResponse.flushBuffer();
+                    return this;
+                }
+            });
+            // REVISIT we need to keep this response's asyncwriter alive so 
that data can be written to the 
+            //   suspended response, but investigate if there is a better 
alternative. 
+            r.getResponse().destroyable(false);
+            return Action.CONTINUE;
+        }
+
+        r.addEventListener(new AtmosphereResourceEventListenerAdapter() {
+            @Override
+            public void onSuspend(AtmosphereResourceEvent event) {
+                final String srid = (String)event.getResource().getRequest()
+                    
.getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
+                LOG.log(Level.FINE, "Registrering suspended resource: {}", 
srid);
+                suspendedResponses.put(srid, 
event.getResource().getResponse());
+
+                AsyncIOWriter writer = 
event.getResource().getResponse().getAsyncIOWriter();
+                if (writer == null) {
+                    writer = new AtmosphereInterceptorWriter();
+                    r.getResponse().asyncIOWriter(writer);
+                }
+                if (writer instanceof AtmosphereInterceptorWriter) {
+                    
((AtmosphereInterceptorWriter)writer).interceptor(interceptor);
+                }
+            }
+
+            @Override
+            public void onDisconnect(AtmosphereResourceEvent event) {
+                super.onDisconnect(event);
+                final String srid = (String)event.getResource().getRequest()
+                    
.getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
+                LOG.log(Level.FINE, "Unregistrering suspended resource: {}", 
srid);
+                suspendedResponses.remove(srid);
+            }
+
+        });
         AtmosphereRequest request = r.getRequest();
 
         if (request.getAttribute(REQUEST_DISPATCHED) == null) {
@@ -115,6 +186,11 @@ public class DefaultProtocolInterceptor extends 
AtmosphereInterceptorAdapter {
             try {
                 byte[] data = 
WebSocketUtils.readBody(request.getInputStream());
                 if (data.length == 0) {
+                    if (AtmosphereResource.TRANSPORT.WEBSOCKET == 
r.transport() 
+                        || AtmosphereResource.TRANSPORT.SSE == r.transport()) {
+                        r.suspend();
+                        return Action.SUSPEND;
+                    }
                     return Action.CANCELLED;
                 }
                 
@@ -124,10 +200,10 @@ public class DefaultProtocolInterceptor extends 
AtmosphereInterceptorAdapter {
                 try {
                     AtmosphereRequest ar = createAtmosphereRequest(request, 
data);
                     response = new WrappedAtmosphereResponse(r.getResponse(), 
ar);
-                    ar.attributes().put(REQUEST_DISPATCHED, "true");
+                    ar.localAttributes().put(REQUEST_DISPATCHED, "true");
                     String refid = 
ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
                     if (refid != null) {
-                        
ar.attributes().put(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
+                        
ar.localAttributes().put(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
                     }
                     // This is a new request, we must clean the Websocket 
AtmosphereResource.
                     
request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE);
@@ -156,7 +232,6 @@ public class DefaultProtocolInterceptor extends 
AtmosphereInterceptorAdapter {
                 return Action.CANCELLED;
             } catch (IOException e) {
                 LOG.log(Level.WARNING, "Error during protocol processing", e);
-                return Action.CONTINUE;
             }           
         } else {
             request.destroyable(false);
@@ -229,6 +304,9 @@ public class DefaultProtocolInterceptor extends 
AtmosphereInterceptorAdapter {
         AtmosphereRequest request = response.request();
         String refid = 
(String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
 
+        if (AtmosphereResource.TRANSPORT.WEBSOCKET != 
response.resource().transport()) {
+            return payload; 
+        }
         Map<String, String> headers = new HashMap<String, String>();
         if (refid != null) {
             response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, 
refid);

Reply via email to