Repository: incubator-wave Updated Branches: refs/heads/master ee6f996eb -> 4c0d78ee8
Added Atmosphere framework as replacement of Socket.IO by Pablo Ojanguren. Fix for #WAVE-405. https://reviews.apache.org/r/19355/ Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/4c0d78ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/4c0d78ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/4c0d78ee Branch: refs/heads/master Commit: 4c0d78ee8afdf92fc3a8ef6f7454726b790cc445 Parents: ee6f996 Author: Vicente J. Ruiz Jurado <[email protected]> Authored: Wed Apr 23 16:14:02 2014 +0200 Committer: Vicente J. Ruiz Jurado <[email protected]> Committed: Wed Apr 23 16:14:02 2014 +0200 ---------------------------------------------------------------------- .gitignore | 2 + build.xml | 31 ++- .../box/server/rpc/ServerRpcProvider.java | 259 +++++++++++++++++-- .../rpc/atmosphere/AtmosphereChannel.java | 116 +++++++++ .../atmosphere/AtmosphereClientInterceptor.java | 103 ++++++++ .../rpc/atmosphere/GuiceAtmosphereFactory.java | 71 +++++ .../box/webclient/WebClient.gwt.xml | 1 - .../box/webclient/client/WaveSocketFactory.java | 53 ++-- .../webclient/client/WaveWebSocketClient.java | 2 +- .../client/atmosphere/AtmosphereConnection.java | 53 ++++ .../atmosphere/AtmosphereConnectionImpl.java | 232 +++++++++++++++++ .../AtmosphereConnectionListener.java | 48 ++++ .../atmosphere/AtmosphereConnectionState.java | 53 ++++ 13 files changed, 967 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index daf83b6..f9e6192 100755 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,5 @@ war/WEB-INF *.old *.log* /third_party/ +/src/org/waveprotocol/box/server/rpc/atmosphere/atmosphere-min.js +/src/org/waveprotocol/box/server/rpc/atmosphere/atmosphere.js http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 55ce93f..8f1fc77 100644 --- a/build.xml +++ b/build.xml @@ -56,7 +56,7 @@ <delete dir="war/waveharness"/> <delete dir="war/webclient"/> </target> - + <target name="clean-lib" description="Cleans up the third party libraries"> <delete dir="${lib.test}"/> <delete dir="${lib.codegen}"/> @@ -255,6 +255,7 @@ <!-- copy non binary resources in src tree to output classpath. Why? --> <copy todir="${build.src.dir}"> <fileset dir="${src.dir}"> + <include name="**/*.js"/> <include name="**/*.xml"/> <include name="**/*.html"/> <include name="**/*.properties"/> @@ -675,7 +676,9 @@ <fileset dir="${staging.dir}"> <include name="META-INF/services/*" /> <include name="com/mongodb/**/*" /> - <include name="com/glines/socketio/**/*" /> + <include name="org/slf4j/**/*" /> + <include name="org/atmosphere/**/*" /> + <include name="com/glines/socketio/**/*" /> <include name="com/google/common/**/*" /> <include name="com/google/gson/**/*" /> <include name="com/google/gxp/**/*" /> @@ -987,7 +990,27 @@ </unzip> </target> - <target name="get-third-party-runtime" depends="get-jetty-dep" description="Download run time third party dependencies"> + <target name="get-atmosphere-dep" description="Download atmosphere framework runtime, java and javascript dependencies"> + <get dest="${lib.runtime}" usetimestamp="true" skipexisting="true"> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/org/atmosphere/atmosphere-runtime/2.1.0/atmosphere-runtime-2.1.0.jar" /> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/org/atmosphere/atmosphere-guice/0.8.3/atmosphere-guice-0.8.3.jar" /> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/eu/infomas/annotation-detector/3.0.0/annotation-detector-3.0.0.jar" /> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/org/slf4j/slf4j-simple/1.6.1/slf4j-simple-1.6.1.jar" /> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar" /> + <url url="https://oss.sonatype.org/service/local/repositories/releases/content/org/atmosphere/client/javascript/2.1.5/javascript-2.1.5.war" /> + </get> + <!-- Add Atmosphere Framework javascript client to the server classpath --> + <unzip src="${lib.runtime}/javascript-2.1.5.war" dest="${lib.runtime}/javascript-2.1.5" > + <patternset> + <include name="javascript/atmosphere-min.js"/> + <include name="javascript/atmosphere.js"/> + </patternset> + </unzip> + <copy file="${lib.runtime}/javascript-2.1.5/javascript/atmosphere.js" todir="${src.dir}/org/waveprotocol/box/server/rpc/atmosphere" /> + <copy file="${lib.runtime}/javascript-2.1.5/javascript/atmosphere-min.js" todir="${src.dir}/org/waveprotocol/box/server/rpc/atmosphere" /> + </target> + + <target name="get-third-party-runtime" depends="get-jetty-dep, get-atmosphere-dep" description="Download run time third party dependencies"> <get dest="${lib.runtime}" usetimestamp="true" skipexisting="true"> <url url="http://central.maven.org/maven2/aopalliance/aopalliance/1.0/aopalliance-1.0.jar" /> <url url="http://central.maven.org/maven2/org/bouncycastle/bcprov-jdk16/1.45/bcprov-jdk16-1.45.jar" /> @@ -1015,7 +1038,7 @@ <url url="http://people.apache.org/repo/m1-ibiblio-rsync-repository/javax.jdo/jars/jdo2-api-2.2.jar" /> <url url="http://central.maven.org/maven2/org/jdom/jdom/1.1.3/jdom-1.1.3.jar" /> <url url="http://central.maven.org/maven2/com/google/code/findbugs/jsr305/2.0.1/jsr305-2.0.1.jar" /> - + <url url="http://central.maven.org/maven2/jline/jline/0.9.94/jline-0.9.94.jar" /> <url url="http://central.maven.org/maven2/joda-time/joda-time/1.6/joda-time-1.6.jar" /> <url url="http://central.maven.org/maven2/org/apache/lucene/lucene-core/3.5.0/lucene-core-3.5.0.jar" /> http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/server/rpc/ServerRpcProvider.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/rpc/ServerRpcProvider.java b/src/org/waveprotocol/box/server/rpc/ServerRpcProvider.java index 9b0f2a9..ca2f6b8 100755 --- a/src/org/waveprotocol/box/server/rpc/ServerRpcProvider.java +++ b/src/org/waveprotocol/box/server/rpc/ServerRpcProvider.java @@ -46,19 +46,49 @@ import com.glines.socketio.server.transport.XHRPollingTransport; import com.glines.socketio.server.transport.jetty.JettyWebSocketTransport; import org.apache.commons.lang.StringUtils; +import org.atmosphere.cache.UUIDBroadcasterCache; +import org.atmosphere.config.service.AtmosphereHandlerService; +import org.atmosphere.cpr.AtmosphereHandler; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceSession; +import org.atmosphere.cpr.AtmosphereResourceSessionFactory; +import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.guice.AtmosphereGuiceServlet; +import org.atmosphere.util.IOUtils; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.session.HashSessionManager; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.util.resource.ResourceCollection; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.webapp.WebAppContext; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.mortbay.jetty.nio.SelectChannelConnector; +import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate; +import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult; +import org.waveprotocol.box.server.CoreSettings; +import org.waveprotocol.box.server.authentication.SessionManager; +import org.waveprotocol.box.server.persistence.file.FileUtils; +import org.waveprotocol.box.server.rpc.atmosphere.AtmosphereChannel; +import org.waveprotocol.box.server.rpc.atmosphere.AtmosphereClientInterceptor; +import org.waveprotocol.box.server.util.NetUtils; +import org.waveprotocol.wave.model.util.Pair; +import org.waveprotocol.wave.model.wave.ParticipantId; +import org.waveprotocol.wave.util.logging.Log; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -79,24 +109,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; -import org.eclipse.jetty.websocket.servlet.WebSocketCreator; -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult; -import org.waveprotocol.box.server.CoreSettings; -import org.waveprotocol.box.server.authentication.SessionManager; -import org.waveprotocol.box.server.persistence.file.FileUtils; -import org.waveprotocol.box.server.util.NetUtils; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - /** * ServerRpcProvider can provide instances of type Service over an incoming * network socket and service incoming RPCs to these services and their methods. @@ -185,6 +197,32 @@ public class ServerRpcProvider { } } + static class AtmosphereConnection extends Connection { + + private final AtmosphereChannel atmosphereChannel; + + public AtmosphereConnection(ParticipantId loggedInUser, ServerRpcProvider provider) { + super(loggedInUser, provider); + + atmosphereChannel = new AtmosphereChannel(this); + expectMessages(atmosphereChannel); + + } + + @Override + protected void sendMessage(int sequenceNo, Message message) { + atmosphereChannel.sendMessage(sequenceNo, message);; + } + + public AtmosphereChannel getAtmosphereChannel() { + return atmosphereChannel; + } + + + } + + + static abstract class Connection implements ProtoCallback { private final Map<Integer, ServerRpcController> activeRpcs = new ConcurrentHashMap<Integer, ServerRpcController>(); @@ -428,9 +466,16 @@ public class ServerRpcProvider { String flashPolicyServerHost = "localhost"; StringBuilder flashPolicyAllowedPorts = new StringBuilder(); - // Servlet where the socketio connection is served from. - // TODO(akaplanov): add servlet when https://github.com/vjrj/Socket.IO-Java will updated to Jetty v9. - // https://issues.apache.org/jira/browse/WAVE-405 + + // Atmosphere framework. Replacement of Socket.IO + // See https://issues.apache.org/jira/browse/WAVE-405 + ServletHolder atholder = addServlet("/atmosphere*", AtmosphereGuiceServlet.class); + // Enable guice. See + // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere%27s-Classes-Creation-and-Injection + atholder.setInitParameter("org.atmosphere.cpr.objectFactory", + "org.waveprotocol.box.server.rpc.atmosphere.GuiceAtmosphereFactory"); + atholder.setAsyncSupported(true); + atholder.setInitOrder(0); /* * Loop through addresses, collect list of ports, and determine if we are to use "localhost" @@ -636,6 +681,178 @@ public class ServerRpcProvider { } /** + * Manange atmosphere connections and dispatch messages to + * wave channels. + * + * @author [email protected] <Pablo Ojanguren> + * + */ + @Singleton + @AtmosphereHandlerService(path = "/atmosphere", + interceptors = {AtmosphereClientInterceptor.class}, + broadcasterCache = UUIDBroadcasterCache.class) + public static class WaveAtmosphereService implements AtmosphereHandler { + + + private static final Log LOG = Log.get(WaveAtmosphereService.class); + + private static final String WAVE_CHANNEL_ATTRIBUTE = "WAVE_CHANNEL_ATTRIBUTE"; + private static final String MSG_SEPARATOR = "|"; + private static final String MSG_CHARSET = "UTF-8"; + + @Inject + public ServerRpcProvider provider; + + + @Override + public void onRequest(AtmosphereResource resource) throws IOException { + + AtmosphereResourceSession resourceSession = + AtmosphereResourceSessionFactory.getDefault().getSession(resource); + + AtmosphereChannel resourceChannel = + resourceSession.getAttribute(WAVE_CHANNEL_ATTRIBUTE, AtmosphereChannel.class); + + if (resourceChannel == null) { + + ParticipantId loggedInUser = + provider.sessionManager.getLoggedInUser(resource.getRequest().getSession(false)); + + AtmosphereConnection connection = new AtmosphereConnection(loggedInUser, provider); + resourceChannel = connection.getAtmosphereChannel(); + resourceSession.setAttribute(WAVE_CHANNEL_ATTRIBUTE, resourceChannel); + resourceChannel.onConnect(resource); + } + + resource.setBroadcaster(resourceChannel.getBroadcaster()); // on every + // request + + if (resource.getRequest().getMethod().equalsIgnoreCase("GET")) { + + resource.suspend(); + + } + + + if (resource.getRequest().getMethod().equalsIgnoreCase("POST")) { + + StringBuilder b = IOUtils.readEntirely(resource); + resourceChannel.onMessage(b.toString()); + + } + + } + + + @Override + public void onStateChange(AtmosphereResourceEvent event) throws IOException { + + + AtmosphereResponse response = event.getResource().getResponse(); + AtmosphereResource resource = event.getResource(); + + if (event.isSuspended()) { + + // Set content type before do response.getWriter() + // http://docs.oracle.com/javaee/5/api/javax/servlet/ServletResponse.html#setContentType(java.lang.String) + response.setContentType("text/plain; charset=UTF-8"); + response.setCharacterEncoding("UTF-8"); + + + if (event.getMessage().getClass().isArray()) { + + LOG.fine("SEND MESSAGE ARRAY " + event.getMessage().toString()); + + List<Object> list = Arrays.asList(event.getMessage()); + + response.getOutputStream().write(MSG_SEPARATOR.getBytes(MSG_CHARSET)); + for (Object object : list) { + String message = (String) object; + message += MSG_SEPARATOR; + response.getOutputStream().write(message.getBytes(MSG_CHARSET)); + } + + } else if (event.getMessage() instanceof List) { + + LOG.fine("SEND MESSAGE LIST " + event.getMessage().toString()); + + @SuppressWarnings("unchecked") + List<Object> list = List.class.cast(event.getMessage()); + + response.getOutputStream().write(MSG_SEPARATOR.getBytes(MSG_CHARSET)); + for (Object object : list) { + String message = (String) object; + message += MSG_SEPARATOR; + response.getOutputStream().write(message.getBytes(MSG_CHARSET)); + } + + } else if (event.getMessage() instanceof String) { + + LOG.fine("SEND MESSAGE " + event.getMessage().toString()); + + String message = (String) event.getMessage(); + response.getOutputStream().write(message.getBytes(MSG_CHARSET)); + } + + + + try { + + response.flushBuffer(); + + switch (resource.transport()) { + case JSONP: + case LONG_POLLING: + event.getResource().resume(); + break; + case WEBSOCKET: + case STREAMING: + case SSE: + response.getOutputStream().flush(); + break; + default: + LOG.info("Unknown transport"); + break; + } + } catch (IOException e) { + LOG.info("Error resuming resource response", e); + } + + + } else if (event.isResuming()) { + + LOG.fine("RESUMING"); + + } else if (event.isResumedOnTimeout()) { + + LOG.fine("RESUMED ON TIMEOUT"); + + } else if (event.isClosedByApplication() || event.isClosedByClient()) { + + LOG.fine("CONNECTION CLOSED"); + + AtmosphereResourceSession resourceSession = + AtmosphereResourceSessionFactory.getDefault().getSession(resource); + + AtmosphereChannel resourceChannel = + resourceSession.getAttribute(WAVE_CHANNEL_ATTRIBUTE, AtmosphereChannel.class); + + if (resourceChannel != null) { + resourceChannel.onDisconnect(); + } + } + } + + @Override + public void destroy() { + // Nothing to do + + } + + + } + + /** * Returns the socket the WebSocket server is listening on. */ public SocketAddress getWebSocketAddress() { http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java b/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java new file mode 100644 index 0000000..4bc9adb --- /dev/null +++ b/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.rpc.atmosphere; + + +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.Broadcaster; +import org.atmosphere.cpr.BroadcasterFactory; +import org.waveprotocol.box.server.rpc.ProtoCallback; +import org.waveprotocol.box.server.rpc.WebSocketChannel; +import org.waveprotocol.wave.util.logging.Log; + +import java.io.IOException; + +/** + * An atmosphere wrapper for the WebSocketChannel type. + * + * @author [email protected] (Pablo Ojanguren) + */ +public class AtmosphereChannel extends WebSocketChannel { + + + private static final Log LOG = Log.get(AtmosphereChannel.class); + + /* The object needed to send messages out */ + private Broadcaster broadcaster; + + + + /** + * Creates a new AtmosphereChannel using the callback for incoming messages. + * + * @param callback A ProtoCallback instance called with incoming messages. + */ + public AtmosphereChannel(ProtoCallback callback) { + super(callback); + broadcaster = BroadcasterFactory.getDefault().get(); + + } + + /** + * A new resource connection has been associated with + * this channel + * @param resource the Atmosphere resource object + */ + public void onConnect(AtmosphereResource resource) { + + // Create a new broadcaster to publish to this resource + broadcaster.addAtmosphereResource(resource); + } + + + public Broadcaster getBroadcaster() { + return broadcaster; + } + + /** + * The atmosphere resource has received a new post message + * @param message the message + */ + public void onMessage(String message) { + + handleMessageString(message); + } + + /** + * The atmosphere resource has been closed + */ + public void onDisconnect() { + + broadcaster = null; + } + + + + /** + * Send the given data String + * + * @param data + * @throws IOException + */ + @Override + protected void sendMessageString(String data) throws IOException { + + if (broadcaster == null || broadcaster.isDestroyed()) { + // Just drop the message. It's rude to throw an exception since the + // caller had no way of knowing. + LOG.warning("Atmosphere Channel is not connected"); + } else { + + LOG.fine("BROADCAST "+data); + broadcaster.broadcast(data); + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java b/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java new file mode 100644 index 0000000..12468d3 --- /dev/null +++ b/src/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.rpc.atmosphere; + + + +import com.google.common.io.ByteStreams; + +import org.atmosphere.config.service.AtmosphereInterceptorService; +import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.AtmosphereInterceptor; +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.waveprotocol.wave.util.logging.Log; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + + /** + * Serve atmosphere.js file to GWT clients This class allows to serve the + * Atmosphere Javascript client from the same /atmosphere servlet path which is + * used to process any Atmosphere request instead of place it on the /static + * path. In addition, the Javascript file is put together with the rest of + * Atmosphere related source code, during the build process as any other third + * party dependency. + * + * + * @author [email protected] (Pablo Ojanguren) + */ + @AtmosphereInterceptorService + public class AtmosphereClientInterceptor implements AtmosphereInterceptor { + + private static final Log LOG = Log.get(AtmosphereClientInterceptor.class); + + @Override + public void configure(AtmosphereConfig config) { + // Nothing to do + } + + @Override + public Action inspect(AtmosphereResource resource) { + + AtmosphereRequest request = resource.getRequest(); + + try { + // Find the first context parameter + String path = request.getPathInfo(); + + if (path == null || path.isEmpty()) + return Action.CONTINUE; + + if (path.startsWith("/")) { + path = path.substring(1); + } + String[] parts = path.split("/"); + + // Serve the file + if (parts.length > 0 && "GET".equals(resource.getRequest().getMethod()) && "atmosphere.js".equals(parts[0])) { + resource.getResponse().setContentType("text/javascript"); + InputStream is = + this.getClass().getClassLoader() + .getResourceAsStream("org/waveprotocol/box/server/rpc/atmosphere/atmosphere.js"); + OutputStream os = resource.getResponse().getOutputStream(); + ByteStreams.copy(is, os); + return Action.CANCELLED; + + } + + + } catch (IOException e) { + LOG.severe("Error sending atmosphere.js",e); + } + + + return Action.CONTINUE; + } + + + @Override + public void postInspect(AtmosphereResource resource) { + // Nothing to do + } + + } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java b/src/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java new file mode 100644 index 0000000..3611caf --- /dev/null +++ b/src/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.rpc.atmosphere; + +import com.google.inject.Injector; + +import org.atmosphere.cpr.AtmosphereFramework; +import org.atmosphere.cpr.AtmosphereObjectFactory; +import org.waveprotocol.wave.util.logging.Log; + +/** + * Custom factory to use wave's guice injector in Atmosphere + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public class GuiceAtmosphereFactory implements AtmosphereObjectFactory { + + private static final Log LOG = Log.get(GuiceAtmosphereFactory.class); + + private static Injector injector; + + @SuppressWarnings("unchecked") + @Override + public <T, U extends T> U newClassInstance(AtmosphereFramework framework, Class<T> classType, Class<U> classToInstantiate) throws InstantiationException, IllegalAccessException { + initInjector(framework); + + + if (injector == null) { + return classToInstantiate.newInstance(); + } else { + return injector.getInstance(classToInstantiate); + } + } + + public String toString() { + return "Guice ObjectFactory"; + } + + private void initInjector(AtmosphereFramework framework) { + if (injector == null) { + com.google.inject.Injector servletInjector = (com.google.inject.Injector) + framework.getServletContext().getAttribute( + com.google.inject.Injector.class.getName()); + + if (servletInjector != null) { + injector = servletInjector; + LOG.fine("Existing injector found to create Atmosphere instances"); + } else { + LOG.fine("Not injector not found to create Atmosphere instances"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/WebClient.gwt.xml ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/WebClient.gwt.xml b/src/org/waveprotocol/box/webclient/WebClient.gwt.xml index 387d0c7..63516ac 100644 --- a/src/org/waveprotocol/box/webclient/WebClient.gwt.xml +++ b/src/org/waveprotocol/box/webclient/WebClient.gwt.xml @@ -25,7 +25,6 @@ <inherits name='com.google.gwt.user.User'/> <inherits name="com.google.gwt.event.EventBase"/> - <inherits name='com.glines.socketio.client.Client'/> <inherits name='com.google.gwt.json.JSON'/> <inherits name='com.google.gwt.websockets.WebSockets'/> http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/WaveSocketFactory.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/WaveSocketFactory.java b/src/org/waveprotocol/box/webclient/client/WaveSocketFactory.java index 4a1788f..d79ce4e 100644 --- a/src/org/waveprotocol/box/webclient/client/WaveSocketFactory.java +++ b/src/org/waveprotocol/box/webclient/client/WaveSocketFactory.java @@ -22,73 +22,66 @@ package org.waveprotocol.box.webclient.client; import com.google.gwt.websockets.client.WebSocket; import com.google.gwt.websockets.client.WebSocketCallback; -import com.glines.socketio.client.common.SocketIOConnection; -import com.glines.socketio.client.common.SocketIOConnectionListener; -import com.glines.socketio.client.gwt.GWTSocketIOConnectionFactory; -import com.glines.socketio.common.DisconnectReason; -import com.glines.socketio.common.SocketIOException; +import org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnection; +import org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionImpl; +import org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionListener; + /** * Factory to create proxy wrappers around either {@link com.google.gwt.websockets.client.WebSocket} - * or {@link com.glines.socketio.client.SocketIOConnection}. - * + * or {@link org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnection}. + * * @author [email protected] (Tad Glines) */ public class WaveSocketFactory { - + /** * Create a WaveSocket instance that wraps a concrete socket implementation. - * If useSocketIO is true an instance of {@link com.glines.socketio.client.SocketIOConnection} + * If useWebSocketAlt is true an instance of {@link org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnection} * is wrapped, otherwise an instance of {@link com.google.gwt.websockets.client.WebSocket} is * wrapped. */ - public static WaveSocket create(boolean useSocketIO, final String urlBase, + public static WaveSocket create(boolean useWebSocketAlt, final String urlBase, final WaveSocket.WaveSocketCallback callback) { - if (useSocketIO) { + if (useWebSocketAlt) { return new WaveSocket() { - /* - * TODO: The urlBase is ignored for now. The default is identical to what is currently - * provided in urlBase. When the GWTSocketIOConnectionFactory.create() is updated, - * parse the urlBAse and pass the host and port. - */ - private final SocketIOConnection socket = GWTSocketIOConnectionFactory.INSTANCE.create( - new SocketIOConnectionListener() { + + private final AtmosphereConnection socket + = new AtmosphereConnectionImpl(new AtmosphereConnectionListener() { + @Override public void onConnect() { callback.onConnect(); } @Override - public void onDisconnect(DisconnectReason reason, String errorMEssage) { + public void onDisconnect() { callback.onDisconnect(); } @Override - public void onMessage(int messageType, String message) { + public void onMessage(String message) { callback.onMessage(message); - } - }, null, (short)0); + }}, urlBase); @Override public void connect() { socket.connect(); + } @Override public void disconnect() { - socket.disconnect(); + socket.close(); } @Override public void sendMessage(String message) { - try { - socket.sendMessage(message); - } catch (SocketIOException e) { - // Ignore because the GWT implementation doesn't throw this Exception. - } + socket.sendMessage(message); } - - }; + + }; + } else { return new WaveSocket() { final WebSocket socket = new WebSocket(new WebSocketCallback() { http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/WaveWebSocketClient.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/WaveWebSocketClient.java b/src/org/waveprotocol/box/webclient/client/WaveWebSocketClient.java index 65746d2..0b68d71 100644 --- a/src/org/waveprotocol/box/webclient/client/WaveWebSocketClient.java +++ b/src/org/waveprotocol/box/webclient/client/WaveWebSocketClient.java @@ -113,7 +113,7 @@ public class WaveWebSocketClient implements WaveSocket.WaveSocketCallback { @Override public boolean execute() { if (!connectedAtLeastOnce && !websocketNotAvailable && connectTry > MAX_INITIAL_FAILURES) { - // Let's try to use socketio, seems that websocket it's not working + // Let's try to use websocket alternative, seems that websocket it's not working // (we are under a proxy or similar) socket = WaveSocketFactory.create(true, urlBase, WaveWebSocketClient.this); } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnection.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnection.java b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnection.java new file mode 100644 index 0000000..29b1f24 --- /dev/null +++ b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnection.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.webclient.client.atmosphere; + +/** + * The atmosphere connection interface for wrapping the javascript + * client in GWT + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public interface AtmosphereConnection { + + + /** + * Initiate a connection attempt. + * + */ + void connect(); + + /** + * Initiate an orderly close of the connection. + * + */ + void close(); + + + /** + * Send a message. + * + * @param message + */ + void sendMessage(String message); + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionImpl.java b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionImpl.java new file mode 100644 index 0000000..c405f18 --- /dev/null +++ b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionImpl.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.webclient.client.atmosphere; + +import com.google.gwt.core.client.Callback; +import com.google.gwt.core.client.JavaScriptObject; +import com.google.gwt.core.client.ScriptInjector; + +/** + * The wrapper implementation of the atmosphere javascript client. + * + * https://github.com/Atmosphere/atmosphere/wiki/jQuery.atmosphere.js-atmosphere + * .js-API + * + * More info about transports + * https://github.com/Atmosphere/atmosphere/wiki/Supported + * -WebServers-and-Browsers + * + * It tries to use Server-Sent Events first and fallback transport to + * long-polling. We ignore Websockets by now because they are the default + * transport on WiAB and atmosphere is the fallback. + * + * http://stackoverflow.com/questions/9397528/server-sent-events-vs-polling + * + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public class AtmosphereConnectionImpl implements AtmosphereConnection { + + + private static final class AtmosphereSocket extends JavaScriptObject { + public static native AtmosphereSocket create(AtmosphereConnectionImpl impl, String urlBase) /*-{ + + var client = $wnd.atmosphere; + + var atsocket = { + request: null, + socket: null }; + + var connectionUrl = window.location.protocol + "//" + $wnd.__websocket_address + "/"; + + connectionUrl += 'atmosphere'; + + //console.log("Connection URL is "+urlBase); + atsocket.request = new client.AtmosphereRequest(); + atsocket.request.url = connectionUrl; + atsocket.request.contenType = 'text/plain;charset=UTF-8'; + atsocket.request.transport = 'sse'; + atsocket.request.fallbackTransport = 'long-polling'; + + atsocket.request.onOpen = $entry(function() { + im...@org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionImpl::onConnect()(); + }); + + atsocket.request.onMessage = $entry(function(response) { + + var r = response.responseBody; + + if (r.indexOf('|') == 0) { + + while (r.indexOf('|') == 0 && r.length > 1) { + + r = r.substring(1); + var marker = r.indexOf('}|'); + im...@org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionImpl::onMessage(Ljava/lang/String;)(r.substring(0, marker+1)); + r = r.substring(marker+1); + + } + + } + else { + + im...@org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionImpl::onMessage(Ljava/lang/String;)(r); + + } + + }); + + atsocket.request.onClose = $entry(function(response) { + + client.util.info("Connection closed"); + + im...@org.waveprotocol.box.webclient.client.atmosphere.AtmosphereConnectionImpl::onDisconnect(Ljava/lang/String;)(response); + }); + + + + atsocket.request.onTransportFailure = function(errorMsg, request) { + + client.util.info(errorMsg); + + }; + + + atsocket.request.onReconnect = function(request, response) { + + client.util.info("Reconnected to the server"); + + }; + + atsocket.request.onError = function(response) { + + client.util.info("Unexpected Error"); + + }; + + + return atsocket; + + + }-*/; + + protected AtmosphereSocket() { + } + + + public native void close() /*-{ + this.socket.unsubscribe(); + }-*/; + + public native AtmosphereSocket connect() /*-{ + this.socket = $wnd.atmosphere.subscribe(this.request); + + }-*/; + + public native void send(String data) /*-{ + this.socket.push(data); + }-*/; + } + + + private final AtmosphereConnectionListener listener; + private String urlBase; + private AtmosphereConnectionState state; + private AtmosphereSocket socket = null; + + public AtmosphereConnectionImpl(AtmosphereConnectionListener listener, + String urlBase) { + this.listener = listener; + this.urlBase = urlBase; + + } + + + @Override + public void connect() { + if (socket == null) { + + ScriptInjector.fromUrl("/atmosphere/atmosphere.js").setCallback( + new Callback<Void, Exception>() { + public void onFailure(Exception reason) { + throw new IllegalStateException("atmosphere.js load failed!"); + } + public void onSuccess(Void result) { + + socket = AtmosphereSocket.create(AtmosphereConnectionImpl.this, urlBase); + socket.connect(); + } + }).setWindow(ScriptInjector.TOP_WINDOW).inject(); + } else { + + + if (AtmosphereConnectionState.CLOSED.equals(this.state)) + socket.connect(); + + } + } + + @Override + public void close() { + if (!AtmosphereConnectionState.CLOSED.equals(this.state)) + socket.close(); + + } + + + @Override + public void sendMessage(String message) { + this.state = AtmosphereConnectionState.MESSAGE_PUBLISHED; + socket.send(message); + } + + + + @SuppressWarnings("unused") + private void onConnect() { + this.state = AtmosphereConnectionState.OPENED; + listener.onConnect(); + } + + /** + * This method is called when an Atmosphere onClose event happens: + * + * when an error occurs. when the server or a proxy closes the connection. + * when an expected exception occurs. when the specified transport is not + * supported or fail to connect. + * + * @param response + * + */ + @SuppressWarnings("unused") + private void onDisconnect(String response) { + this.state = AtmosphereConnectionState.CLOSED; + listener.onDisconnect(); + } + + @SuppressWarnings("unused") + private void onMessage(String message) { + this.state = AtmosphereConnectionState.MESSAGE_RECEIVED; + listener.onMessage(message); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionListener.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionListener.java b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionListener.java new file mode 100644 index 0000000..aff7bfd --- /dev/null +++ b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionListener.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.webclient.client.atmosphere; + +/** + * The listener interface for the atmosphere client wrapper. + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public interface AtmosphereConnectionListener { + + /** + * Called when a successful connection with server is performed + */ + void onConnect(); + + /** + * Called when connection is closed properly or by error + */ + void onDisconnect(); + + /** + * Called when a new message is received from the server + * @param message + */ + void onMessage(String message); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/4c0d78ee/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionState.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionState.java b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionState.java new file mode 100644 index 0000000..a6b3c94 --- /dev/null +++ b/src/org/waveprotocol/box/webclient/client/atmosphere/AtmosphereConnectionState.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.webclient.client.atmosphere; + +/** + * Connection state definition inspired by former SocketIO/WebSocket + * implementation + * + * @author [email protected] (Pablo Ojanguren) + * + */ +public enum AtmosphereConnectionState { + + MESSAGE_RECEIVED("messageReceived"), + MESSAGE_PUBLISHED("messagePublished"), OPENED("opened"), + CLOSED("closed"), UNKNOWN("unknown"); + + private String value; + private AtmosphereConnectionState(String v) { this.value = v; } + public String value() { return value; } + + public static AtmosphereConnectionState fromString(String val) { + + if (val.equals("messageReceived")) { + return MESSAGE_RECEIVED; + } else if (val.equals("messagePublished")) { + return MESSAGE_PUBLISHED; + } else if (val.equals("opened")) { + return OPENED; + } else if (val.equals("closed")) { + return CLOSED; + } + + return UNKNOWN; + } +}
