WAVE-438 - Removes XMPP federation implementation along with relevant resources and unit tests. Fixes issue with a test when running with Java1.8.
Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/ed4feb70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/ed4feb70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/ed4feb70 Branch: refs/heads/master Commit: ed4feb7018f35242fe5ecb00bf98b9462d7836ff Parents: 29f3d34 Author: Yuri Zelikov <[email protected]> Authored: Thu May 5 14:50:30 2016 +0300 Committer: Yuri Zelikov <[email protected]> Committed: Fri May 6 14:29:26 2016 +0300 ---------------------------------------------------------------------- README.md | 11 - wave/build.gradle | 7 +- wave/config/reference.conf | 31 +- wave/config/server-config.xml | 65 -- .../box/server/gxp/AuthenticationPage.gxp | 1 - .../org/waveprotocol/box/server/ServerMain.java | 14 +- .../server/executor/ExecutorAnnotations.java | 5 - .../box/server/executor/ExecutorsModule.java | 9 - .../wave/federation/xmpp/Base64Util.java | 67 -- .../xmpp/ComponentPacketTransport.java | 149 ---- .../federation/xmpp/IncomingPacketHandler.java | 38 -- .../xmpp/OutgoingPacketTransport.java | 38 -- .../wave/federation/xmpp/PacketCallback.java | 34 - .../wave/federation/xmpp/RemoteDisco.java | 430 ------------ .../federation/xmpp/SuccessFailCallback.java | 30 - .../wave/federation/xmpp/XmppDisco.java | 212 ------ .../federation/xmpp/XmppFederationHost.java | 446 ------------ .../xmpp/XmppFederationHostForDomain.java | 173 ----- .../federation/xmpp/XmppFederationModule.java | 61 -- .../federation/xmpp/XmppFederationRemote.java | 633 ----------------- .../xmpp/XmppFederationTransport.java | 50 -- .../wave/federation/xmpp/XmppManager.java | 474 ------------- .../wave/federation/xmpp/XmppNamespace.java | 43 -- .../wave/federation/xmpp/XmppUtil.java | 182 ----- .../persistence/file/AccountStoreTest.java | 3 +- .../persistence/file/AttachmentStoreTest.java | 3 +- .../persistence/file/CertPathStoreTest.java | 4 +- .../server/persistence/file/DeltaStoreTest.java | 5 +- .../wave/federation/xmpp/MockDisco.java | 108 --- .../xmpp/MockOutgoingPacketTransport.java | 73 -- .../wave/federation/xmpp/RemoteDiscoTest.java | 138 ---- .../wave/federation/xmpp/RoundTripTest.java | 378 ----------- .../wave/federation/xmpp/XmppDiscoTest.java | 674 ------------------- .../xmpp/XmppFederationHostForDomainTest.java | 329 --------- .../xmpp/XmppFederationRemoteTest.java | 497 -------------- .../SimpleWantedEvaluationSetTest.java | 2 +- 36 files changed, 16 insertions(+), 5401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index b33916a..4572e41 100644 --- a/README.md +++ b/README.md @@ -160,17 +160,6 @@ Note: - if a jar is unable to be unzipped with wave:extractApi then delete the jar from your cache and try again. You may need to restart. If problem persists let the newsgroup know or create an issue on Jira. -To config your server a default configuration is provided by reference.conf, -this can be overwritten by application.conf with custom values. - -To enable federation the following must be run. - -To create a simple configuration run: - `./gradlew prosody-config` - -To override default values pass them to the ant script. -For example, to override wave\_server\_domain run: -`./gradlew prosody-config -Dwave_server_domain=example.com` Take a look at the reference.conf to learn about configuration and possible/default values. The server can be started (on Linux/MacOS) by running http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/build.gradle ---------------------------------------------------------------------- diff --git a/wave/build.gradle b/wave/build.gradle index 71ee962..f09b059 100644 --- a/wave/build.gradle +++ b/wave/build.gradle @@ -30,7 +30,7 @@ apply plugin: 'com.google.protobuf' /* Meta Data Info */ def title = 'Apache Wave Server' def vendor = 'The Apache Software Foundation' -version = "0.4.1" +version = "0.4.2" mainClassName = "org.waveprotocol.box.server.ServerMain" applicationDefaultJvmArgs = [ "-Xmx1024M", @@ -166,9 +166,6 @@ dependencies { [group: "org.eclipse.jetty.websocket", name: "websocket-common", version: "9.2.14.v20151106"], // [?, ?] [group: "org.eclipse.jetty.websocket", name: "websocket-server", version: "9.2.14.v20151106"], // [?, ?] [group: "org.eclipse.jetty.websocket", name: "websocket-servlet", version: "9.2.14.v20151106"], // [?, ?] - [group: "org.gnu.inet", name: "libidn", version: "1.15"], // [?, ?] - [group: "org.igniterealtime", name: "tinder", version: "1.2.3"], // [1/2016, 6/2016] - [group: "org.igniterealtime.whack", name: "core", version: "2.0.0"], // [1/2016, 6/2016] [group: "org.jdom", name: "jdom", version: "1.1.3"], // [?, ?] [group: "org.mongodb", name: "mongo-java-driver", version: "2.11.2"], // [?, ?] [group: "org.slf4j", name: "slf4j-api", version: "1.6.1"], // [?, ?] @@ -507,8 +504,6 @@ testGwt.mustRunAfter compileGwt, testLarge testMongo.mustRunAfter compileJava, test testLarge.mustRunAfter test -ant.importBuild 'config/server-config.xml' - //============================================================================= // Custom UberJar Implementation // Author Note: this custom implementation should be replaced by the shadow http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/config/reference.conf ---------------------------------------------------------------------- diff --git a/wave/config/reference.conf b/wave/config/reference.conf index 425a576..30cf51f 100644 --- a/wave/config/reference.conf +++ b/wave/config/reference.conf @@ -207,13 +207,11 @@ security { clientauth_cert_domain : "" } +# Please note that currently Wave in a Box server has no Federation implementation. federation { # Federation Configuration for the Wave in a Box server enable_federation : false - # These will probably need to be changed - xmpp_server_secret : secret - # The PKCS#8-PEM-encoded private key. certificate_private_key : "local.net.key" @@ -228,33 +226,6 @@ federation { # The domain for which the certificate was issued. certificate_domain : "local.net" - xmpp_component_name : wave - - # This server's local JID - xmpp_jid : "wave.local.net" - - xmpp_server_description : "Wave in a Box" - - disco_info_category : "collaboration" - - disco_info_type : "apache-wave" - - xmpp_server_hostname : "local.net" - - xmpp_server_component_port : 5275 - - # How long to cache failed disco results. - xmpp_disco_failed_expiry : 300s - - # How long to cache successful disco results. - xmpp_disco_successful_expiry : 7200s - - disco_expiration : 6h - - # Set XMPP_SERVER_IP to localhost if the XMPP and Wave in a Box servers are - # running on the same host - xmpp_server_ip : localhost - # Set true to disable the verification of signed deltas waveserver_disable_verification : true http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/config/server-config.xml ---------------------------------------------------------------------- diff --git a/wave/config/server-config.xml b/wave/config/server-config.xml deleted file mode 100644 index deb5f3b..0000000 --- a/wave/config/server-config.xml +++ /dev/null @@ -1,65 +0,0 @@ -<!-- - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - --> -<project name="server config" basedir="../" default="prosody-config"> - <description>Creates the server configuration file.</description> - - <property name="wave_server_domain" value="local.net" /> - <property name="xmpp_server_secret" value="opensesame" /> - <property name="certificate_private_key" value="${wave_server_domain}.key" /> - <property name="certificate_files" value="${wave_server_domain}.crt,sub.class1.server.ca.pem,ca.pem" /> - <property name="certificate_domain" value="${wave_server_domain}" /> - <property name="xmpp_component_name" value="wave" /> - <property name="xmpp_jid" value="${xmpp_component_name}.${wave_server_domain}" /> - <property name="xmpp_server_description" value=""Wave in a Box"" /> - <property name="xmpp_server_hostname" value="${wave_server_domain}" /> - <property name="xmpp_server_component_port" value="5275" /> - <property name="xmpp_server_to_server_port" value="5269" /> - <property name="xmpp_server_ping" value="wavesandbox.com" /> - <property name="xmpp_server_ip" value="${xmpp_server_hostname}" /> - <property name="waveserver_disable_verification" value="false" /> - <property name="waveserver_disable_signer_verification" value="false" /> - - - <target name="prosody-config" - description="Run to create the prosody configuration files. - ant -f server-config.xml prosody-config"> - <echo>Generating ${certificate_domain}.cfg.lua</echo> - <copy file="${certificate_domain}.cfg.lua" - tofile="${certificate_domain}.cfg.lua.old" - overwrite="true" - failonerror="false" /> - <copy file="prosody.cfg.lua.example" tofile="${certificate_domain}.cfg.lua" overwrite="true"> - <filterchain> - <replacetokens> - <token key="BASEDIR" value="${basedir}" /> - <token key="XMPP_SERVER_SECRET" value="${xmpp_server_secret}" /> - <token key="CERTIFICATE_PRIVATE_KEY" value="${certificate_private_key}" /> - <token key="CERTIFICATE_DOMAIN" value="${certificate_domain}" /> - <token key="XMPP_JID" value="${xmpp_jid}" /> - <token key="XMPP_SERVER_DESCRIPTION" value="${xmpp_server_description}" /> - <token key="XMPP_SERVER_COMPONENT_PORT" value="${xmpp_server_component_port}" /> - <token key="XMPP_SERVER_TO_SERVER_PORT" value="${xmpp_server_to_server_port}" /> - </replacetokens> - </filterchain> - </copy> - <echo>Please, manually copy ${certificate_domain}.cfg.lua to your prosody configuration directory.</echo> - <echo>E.g. sudo cp ${certificate_domain}.cfg.lua /etc/prosody/conf.d/${certificate_domain}.cfg.lua</echo> - <echo>Additionally, ensure your ${certificate_domain} SRV record points to port ${xmpp_server_to_server_port}</echo> - </target> -</project> http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp ---------------------------------------------------------------------- diff --git a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp index d8791d4..d188b05 100644 --- a/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp +++ b/wave/src/main/gxp/org/waveprotocol/box/server/gxp/AuthenticationPage.gxp @@ -106,7 +106,6 @@ <li> This project lets developers and enterprise users run wave servers and host waves on their own hardware. - And then share those waves with other wave servers. </li> </ul> </p> http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java index d258f9c..727ad44 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java +++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java @@ -61,7 +61,6 @@ import org.waveprotocol.box.stat.StatService; import org.waveprotocol.wave.crypto.CertPathStore; import org.waveprotocol.wave.federation.FederationTransport; import org.waveprotocol.wave.federation.noop.NoOpFederationModule; -import org.waveprotocol.wave.federation.xmpp.XmppFederationModule; import org.waveprotocol.wave.model.version.HashedVersionFactory; import org.waveprotocol.wave.model.wave.ParticipantIdUtil; import org.waveprotocol.wave.util.logging.Log; @@ -108,10 +107,9 @@ public class ServerMain { injector = injector.createChildInjector(profilingModule, executorsModule); Config config = injector.getInstance(Config.class); - boolean enableFederation = config.getBoolean("federation.enable_federation"); Module serverModule = injector.getInstance(ServerModule.class); - Module federationModule = buildFederationModule(injector, enableFederation); + Module federationModule = buildFederationModule(injector); Module robotApiModule = new RobotApiModule(); PersistenceModule persistenceModule = injector.getInstance(PersistenceModule.class); Module searchModule = injector.getInstance(SearchModule.class); @@ -140,15 +138,9 @@ public class ServerMain { server.startWebSocketServer(injector); } - private static Module buildFederationModule(Injector settingsInjector, boolean enableFederation) + private static Module buildFederationModule(Injector settingsInjector) throws ConfigurationException { - Module federationModule; - if (enableFederation) { - federationModule = settingsInjector.getInstance(XmppFederationModule.class); - } else { - federationModule = settingsInjector.getInstance(NoOpFederationModule.class); - } - return federationModule; + return settingsInjector.getInstance(NoOpFederationModule.class); } private static void initializeServer(Injector injector, String waveDomain) http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java index a002c76..5621610 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorAnnotations.java @@ -80,11 +80,6 @@ public interface ExecutorAnnotations { public @interface RobotGatewayExecutor { } - @Retention(RUNTIME) - @BindingAnnotation - public @interface XmppExecutor { - } - @BindingAnnotation public @interface SolrExecutor { } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java index bfcd345..2d8e65e 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ExecutorsModule.java @@ -137,15 +137,6 @@ public class ExecutorsModule extends AbstractModule { @Provides @Singleton - @XmppExecutor - protected ScheduledExecutorService provideXmppExecutor( - Provider<ScheduledRequestScopeExecutor> executorProvider) { - return provideScheduledThreadPoolExecutor(executorProvider, 1, XmppExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton @SolrExecutor protected Executor provideSolrExecutor(Provider<RequestScopeExecutor> executorProvider, Config config) { http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java deleted file mode 100644 index 6ab6569..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/Base64Util.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.protobuf.AbstractMessageLite; -import com.google.protobuf.ByteString; - -import org.apache.commons.codec.binary.Base64; - -import java.nio.charset.Charset; - -/** - * Utility class for encoding and decoding ByteStrings, byte arrays and encoding - * generic protocol buffers. - * - * @author [email protected] (Anthony Baxter) - * @author [email protected] (Sam Thorogood) - */ -public final class Base64Util { - - // Character set for all encoding and decoding. Base64 can be correctly - // represented using UTF-8. - private static final Charset CHAR_SET = Charset.forName("UTF-8"); - - /** - * Utility class only, cannot be instantiated. - */ - private Base64Util() { - } - - public static String encode(ByteString bs) { - return new String(Base64.encodeBase64(bs.toByteArray()), CHAR_SET); - } - - public static String encode(byte[] ba) { - return new String(Base64.encodeBase64(ba), CHAR_SET); - } - - public static String encode(AbstractMessageLite message) { - return new String(Base64.encodeBase64(message.toByteArray()), CHAR_SET); - } - - public static byte[] decodeFromArray(String str) { - return Base64.decodeBase64(str.getBytes(CHAR_SET)); - } - - public static ByteString decode(String str) { - return ByteString.copyFrom(Base64.decodeBase64(str.getBytes(CHAR_SET))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java deleted file mode 100644 index a98dbbc..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/ComponentPacketTransport.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.jivesoftware.whack.ExternalComponentManager; -import org.xmpp.component.Component; -import org.xmpp.component.ComponentException; -import org.xmpp.component.ComponentManager; -import org.xmpp.packet.JID; -import org.xmpp.packet.Packet; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.logging.Logger; - -/** - * Talks to a XMPP server using the Jabber Component Protocol (XEP-0114). - * - * Implements {@link OutgoingPacketTransport} allowing users to send packets, - * and accepts an {@link IncomingPacketHandler} which can process incoming - * packets. - * - * @author [email protected] (Sam Thorogood) - */ -public class ComponentPacketTransport implements Component, OutgoingPacketTransport { - private static final Logger LOG = - Logger.getLogger(ComponentPacketTransport.class.getCanonicalName()); - - private final IncomingPacketHandler handler; - private final String componentName; - private final String serverDomain; - private final String serverSecret; - private final String serverAddress; - private final int serverPort; - - // Contains packets queued but not sent (while offline). - private final Queue<Packet> queuedPackets; - - // Object used to lock around online/offline state changes. - private final Object connectionLock = new Object(); - - private ExternalComponentManager componentManager = null; - private boolean connected = false; - - @Inject - public ComponentPacketTransport(IncomingPacketHandler handler, Config config) { - this.handler = handler; - this.componentName = config.getString("federation.xmpp_component_name"); - this.serverDomain = config.getString("federation.xmpp_server_hostname"); - this.serverSecret = config.getString("federation.xmpp_server_secret"); - this.serverAddress = config.getString("federation.xmpp_server_ip"); - this.serverPort = config.getInt("federation.xmpp_server_component_port"); - - queuedPackets = new LinkedList<>(); - } - - /** - * Bind the component to the XMPP server. - * - * @throws ComponentException if the component couldn't talk to the server - */ - public void run() throws ComponentException { - componentManager = new ExternalComponentManager(serverAddress, serverPort); - componentManager.setDefaultSecretKey(serverSecret); - componentManager.setServerName(serverDomain); - - // Register this component with the manager. - componentManager.addComponent(componentName, this); - } - - @Override - public void sendPacket(Packet packet) { - synchronized (connectionLock) { - if (connected) { - componentManager.sendPacket(this, packet); - } else { - queuedPackets.add(packet); - } - } - } - - @Override - public String getDescription() { - return "Wave in a Box Server"; - } - - @Override - public String getName() { - return componentName; - } - - @Override - public void initialize(JID jid, ComponentManager componentManager) { - // TODO(thorogood): According to XEP-0114, the only valid JID here is the - // same JID we attempt to connect to the XMPP server with. - LOG.info("Initializing with JID: " + jid); - } - - /** - * {@inheritDoc} - * - * Pass the incoming on-the-wire packet onto the incoming handler. - */ - @Override - public void processPacket(Packet packet) { - handler.receivePacket(packet); - } - - @Override - public void shutdown() { - synchronized (connectionLock) { - LOG.info("Disconnected from XMPP server."); - componentManager = null; - connected = false; - } - } - - @Override - public void start() { - synchronized (connectionLock) { - connected = true; - LOG.info("Connected to XMPP server with JID: " + componentName + "." + serverDomain); - - // Send all queued outgoing packets. - while (!queuedPackets.isEmpty()) { - componentManager.sendPacket(this, queuedPackets.poll()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java deleted file mode 100644 index cb60ac0..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/IncomingPacketHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import org.xmpp.packet.Packet; - -/** - * Generic incoming XMPP packet handler interface. This should only be - * implemented by {@link XmppManager}, regardless of which wire transport is in - * use. - * - * @author [email protected] (Sam Thorogood) - */ -public interface IncomingPacketHandler { - - /** - * Accept a generic XMPP packet from on-the-wire. - */ - void receivePacket(Packet packet); - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java deleted file mode 100644 index 4899df0..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/OutgoingPacketTransport.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import org.xmpp.packet.Packet; - -/** - * Generic outgoing XMPP packet transport interface. Should be implemented by - * the handling XMPP transport (e.g. component system). - * - * @author [email protected] (Sam Thorogood) - */ -public interface OutgoingPacketTransport { - - /** - * Send a packet over-the-wire to its prescribed destination address. Provides - * no guarantees of delivery or callback. - */ - void sendPacket(Packet packet); - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java deleted file mode 100644 index e86a091..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/PacketCallback.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import org.xmpp.packet.Packet; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; - -/** - * Simple callback type used for sending and receiving reliable XMPP packet - * messages. This allows for clearly defined success and failure states. - * - * @author [email protected] (Sam Thorogood) - */ -public interface PacketCallback { - void run(Packet packet); - void error(FederationError error); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java deleted file mode 100644 index b6d5868..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/RemoteDisco.java +++ /dev/null @@ -1,430 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.MapMaker; -import com.google.common.collect.Sets; - -import org.dom4j.Attribute; -import org.dom4j.Element; -import org.joda.time.DateTimeUtils; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.xmpp.packet.IQ; -import org.xmpp.packet.Packet; - -import java.security.SecureRandom; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -/** - * Represents XMPP disco status for a specific remote domain. This class only - * exposes one public method; {@link #discoverRemoteJID}. - * - * @author [email protected] (Sam Thorogood) - */ -public class RemoteDisco { - private static final Logger LOG = Logger.getLogger(RemoteDisco.class.getCanonicalName()); - - static final int MAXIMUM_DISCO_ATTEMPTS = 5; - static final int MINIMUM_REXMIT_MS = 15000; - static final int REXMIT_JITTER_MS = 2000; - static final int DISCO_INFO_TIMEOUT = 20; - - private final long creationTimeMillis; - private final long failExpirySecs; - private final long successExpirySecs; - - enum Status { - INIT, PENDING, COMPLETE - } - - private final Random random = new SecureRandom(); - private final XmppManager manager; - private final String remoteDomain; - private final AtomicReference<Status> status; - private final Queue<SuccessFailCallback<String, String>> pending; - - // Result JID field that will be available on COMPLETE status. - private String remoteJid; - - // Error field that will be available on COMPLETE status. - private FederationError error; - - - // These two values are used for tracking success and failure counts. - // Not yet exposed in the fedone waveserver. - public static final LoadingCache<String, AtomicLong> statDiscoSuccess = - CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() { - @Override - public AtomicLong load(String domain) { - return new AtomicLong(); - } - }); - - public static final LoadingCache<String, AtomicLong> statDiscoFailed = - CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() { - @Override - public AtomicLong load(String domain) { - return new AtomicLong(); - } - }); - - /** - * Construct a new RemoteDisco targeting the given domain. This will not kick - * off the disco request itself. - * @param manager XmppManager object, used to send packets - * @param remoteDomain the name of the remote domain (not JID) - * @param failExpirySecs how long to keep alive a failed disco result - * @param successExpirySecs how long to keep alive a successful disco result - */ - public RemoteDisco(XmppManager manager, String remoteDomain, long failExpirySecs, - long successExpirySecs) { - this.manager = manager; - status = new AtomicReference<Status>(Status.INIT); - pending = new ConcurrentLinkedQueue<SuccessFailCallback<String, String>>(); - this.remoteDomain = remoteDomain; - this.creationTimeMillis = DateTimeUtils.currentTimeMillis(); - this.failExpirySecs = failExpirySecs; - this.successExpirySecs = successExpirySecs; - } - - /** - * Construct a new RemoteDisco - purely for testing - with an already - * determined result. Either jid or error must be passed. - * - * @param remoteDomain the name of the remote domain (not JID) - * @param jid the domain's remote JID - * @param error the error from disco - */ - @VisibleForTesting - RemoteDisco(String remoteDomain, String jid, FederationError error) { - Preconditions.checkArgument((jid != null)^(error != null)); - - manager = null; - status = new AtomicReference<Status>(Status.COMPLETE); - pending = null; - this.remoteDomain = remoteDomain; - this.remoteJid = jid; - this.error = error; - // defaults for testing - this.creationTimeMillis = DateTimeUtils.currentTimeMillis(); - this.failExpirySecs = 2 * 60; - this.successExpirySecs = 2 * 60 * 60; - } - - /** - * Check whether the request is currently PENDING. Visible only for tests. - * @return true if pending else false - */ - @VisibleForTesting - boolean isRequestPending() { - return status.get().equals(Status.PENDING); - } - - /** - * Attempt to discover the remote JID for this domain. If the JID has already - * been discovered, then this method will invoke the callback immediately. - * Otherwise, the callback is guaranteed to be invoked at a later point. - * - * @param callback a callback to be invoked when disco is complete - */ - public void discoverRemoteJID(SuccessFailCallback<String, String> callback) { - if (status.get().equals(Status.COMPLETE)) { - complete(callback); - } else if (status.compareAndSet(Status.INIT, Status.PENDING)) { - pending.add(callback); - startDisco(); - } else { - pending.add(callback); - - // If we've become complete since the start of this method, complete - // all possible callbacks. - if (status.get().equals(Status.COMPLETE)) { - SuccessFailCallback<String, String> item; - while ((item = pending.poll()) != null) { - complete(item); - } - } - } - } - - /** - * Returns true if this RemoteDisco's time to live is exceeded. - * - * We can't use MapMaker's expiration code as it won't let us have different expiry for - * successful and failed cases. - * - * @return whether this object should be deleted and recreated - */ - public boolean ttlExceeded() { - if (status.get() == Status.COMPLETE) { - if (remoteJid == null) { - // Failed disco case - if (DateTimeUtils.currentTimeMillis() > - (creationTimeMillis + (1000 * failExpirySecs))) { - return true; - } - } else { - // Successful disco case - if (DateTimeUtils.currentTimeMillis() > - (creationTimeMillis + (1000 * successExpirySecs))) { - return true; - } - } - } - return false; - } - - /** - * Complete any specific callback (in the current thread). Requires the status - * to be COMPLETE. - * - * TODO(thorogood): thread model for completing callbacks - * @param callback the callback to complete - */ - private void complete(SuccessFailCallback<String, String> callback) { - Preconditions.checkState(status.get().equals(Status.COMPLETE)); - if (remoteJid != null) { - callback.onSuccess(remoteJid); - } else { - // TODO(thorogood): better toString, or change failure type to FederationError - callback.onFailure(error.toString()); - } - } - - /** - * Start XMPP discovery. Kicks off a retrying call to dial-up the remote - * server and discover its available disco items. - * - * This should only be called by a method holding the PENDING state. - */ - private void startDisco() { - final IQ request = manager.createRequestIQ(remoteDomain); - request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS); - - final Runnable requester = new Runnable() { - int attempt = 0; - - final PacketCallback callback = new PacketCallback() { - @Override - public void run(Packet result) { - Preconditions.checkArgument(result instanceof IQ, "Manager must provide response IQ"); - processDiscoItemsResult((IQ) result); - } - - @Override - public void error(FederationError error) { - if (error.getErrorCode().equals(FederationError.Code.REMOTE_SERVER_TIMEOUT)) { - retry(); - } else { - LOG.info("Remote server " + remoteDomain + " failed on disco items: " - + error.getErrorCode()); - processDiscoItemsResult(null); - } - } - }; - - void retry() { - attempt += 1; - if (attempt > MAXIMUM_DISCO_ATTEMPTS) { - finish(null, FederationErrors - .newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT)); - } else { - // TODO(thorogood): fix ms/seconds! - int timeout = nextDiscoRetransmitTimeout(attempt) / 1000; - request.setID(XmppUtil.generateUniqueId()); - LOG.info("Sending disco items request for: " + remoteDomain + ", timeout " + timeout - + " seconds"); - manager.send(request, callback, timeout); - } - } - - @Override - public void run() { - retry(); - } - }; - - // Kick off requester! - requester.run(); - } - - /** - * Calculate the requested timeout for any given request number. Introduces - * random jitter. - * - * @param attempt the attempt count - * @return request timeout in ms - */ - private int nextDiscoRetransmitTimeout(int attempt) { - Preconditions.checkArgument(attempt > 0); - return MINIMUM_REXMIT_MS * (1 << (attempt - 1)) + random.nextInt(REXMIT_JITTER_MS); - } - - /** - * Process a returned set of disco items. Invoke a query for each item in - * parallel, searching for any item which supports Wave. - * - * @param result IQ stanza provided from disco items, if null try default items - */ - private void processDiscoItemsResult(@Nullable IQ result) { - Set<String> candidates = Sets.newHashSet(); - - // Traverse the source list, finding possible JID candidates. - if (result != null) { - List<Element> items = XmppUtil.toSafeElementList(result.getChildElement().elements("item")); - for (Element item : items) { - Attribute jid = item.attribute("jid"); - if (jid != null) { - candidates.add(jid.getValue()); - } - } - } - - // Returned nothing for the items list. Try the domain itself. - if (candidates.isEmpty()) { - candidates.add(remoteDomain); - } - - // Always query 'wave.', as an automatic fallback. - candidates.add("wave." + remoteDomain); - - // Iterate over all candidates, requesting information in parallel. - AtomicInteger sharedLatch = new AtomicInteger(candidates.size()); - for (String candidate : candidates) { - requestDiscoInfo(candidate, sharedLatch); - } - } - - /** - * Request disco info from a specific target JID. Accepts a target JID as well - * as a shared latch: on a result, the latch should be decremented and if it - * reaches zero, finish() must be invoked with an error. - * - * @param target the target JID - * @param sharedLatch a shared latch - */ - private void requestDiscoInfo(String target, final AtomicInteger sharedLatch) { - final IQ request = manager.createRequestIQ(target); - request.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO); - - PacketCallback callback = new PacketCallback() { - @Override - public void error(FederationError error) { - int currentCount = sharedLatch.decrementAndGet(); - Preconditions.checkState(currentCount >= 0, - "Info latch should not count down past zero for domain: %s", remoteDomain); - if (currentCount == 0) { - finish(null, error); - } - } - - @Override - public void run(Packet packet) { - Preconditions.checkArgument(packet instanceof IQ); - IQ result = (IQ) packet; - - List<Element> features = - XmppUtil.toSafeElementList(result.getChildElement().elements("feature")); - for (Element feature : features) { - Attribute var = feature.attribute("var"); - if (var != null && var.getValue().equals(XmppNamespace.NAMESPACE_WAVE_SERVER)) { - String targetJID = packet.getFrom().toString(); - finish(targetJID, null); - - // Decrement the latch *after* finishing, so we don't allow an error - // callback to be kicked off. - Preconditions.checkState(sharedLatch.decrementAndGet() >= 0, - "Info latch should not count down past zero for domain: %s", remoteDomain); - return; - } - } - - // This result didn't contain a useful result JID, so cause an error. - error(FederationErrors.newFederationError(FederationError.Code.ITEM_NOT_FOUND)); - } - }; - - LOG.info("Sending disco info request for: " + target); - manager.send(request, callback, DISCO_INFO_TIMEOUT); - } - - /** - * Finish this disco attempt with either a success or error result. This - * method should only be called on a thread that owns the PENDING state and - * will (if successful) result in a transition to COMPLETE. If the disco - * attempt is already complete, return false and do nothing (safe operation). - * - * @param jid success JID, or null - * @param error error proto, or null - * @return true if successful, false if already finished - */ - @VisibleForTesting - boolean finish(String jid, FederationError error) { - Preconditions.checkArgument((jid != null)^(error != null)); - if (!status.compareAndSet(Status.PENDING, Status.COMPLETE)) { - return false; - } - - // Set either the result JID or error state. - - try { - if (jid != null) { - this.remoteJid = jid; - LOG.info("Discovered remote JID: " + jid + " for " + remoteDomain); - statDiscoSuccess.get(remoteDomain).incrementAndGet(); - } else if (error != null) { - this.error = error; - LOG.info("Could not discover remote JID: " + error + " for " + remoteDomain); - statDiscoFailed.get(remoteDomain).incrementAndGet(); - } else { - throw new IllegalArgumentException("At least one of jid/error must be set"); - } - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - - // Complete all available callbacks. - SuccessFailCallback<String, String> item; - while ((item = pending.poll()) != null) { - complete(item); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java deleted file mode 100644 index d88a141..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/SuccessFailCallback.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -/** - * A generic onSuccess/onFailure callback interface. - * - * @author [email protected] (Ben Kalman) - */ -public interface SuccessFailCallback<SuccessValue, FailureValue> { - void onSuccess(SuccessValue response); - void onFailure(FailureValue reason); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java deleted file mode 100644 index 3e03ef7..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppDisco.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.dom4j.Element; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.xmpp.packet.IQ; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Implementation of XMPP Discovery. Provides public methods to respond to incoming disco requests - * (via {@link XmppManager}), as well as outgoing disco via {{@link #discoverRemoteJid}. - * - * @author [email protected] (Anthony Baxter) - * @author [email protected] (Sam Thorogood) - */ -public class XmppDisco { - - @SuppressWarnings("unused") - private static final Logger LOG = Logger.getLogger(XmppDisco.class.getCanonicalName()); - - // This tracks the number of disco attempts started. - public static final LoadingCache<String, AtomicLong> statDiscoStarted = - CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() { - @Override - public AtomicLong load(@SuppressWarnings("NullableProblems") String domain) { - return new AtomicLong(); - } - }); - - private final LoadingCache<String, RemoteDisco> discoRequests; - private final String serverDescription; - - private XmppManager manager = null; - // Accessed by XmppFederationHostForDomain. - final long failExpirySecs; - final long successExpirySecs; - final long discoExpirationHours; - final String discoInfoCategory; - final String discoInfoType; - - /** - * Constructor. Note that {@link #setManager} must be called before this class is ready to use. - */ - @Inject - public XmppDisco(Config config) { - this.serverDescription = config.getString("federation.xmpp_server_description"); - this.discoInfoCategory = config.getString("federation.disco_info_category"); - this.discoInfoType = config.getString("federation.disco_info_type"); - this.failExpirySecs = config.getDuration("federation.xmpp_disco_failed_expiry", TimeUnit.SECONDS); - this.successExpirySecs = config.getDuration("federation.xmpp_disco_successful_expiry", TimeUnit.SECONDS); - this.discoExpirationHours = config.getDuration("federation.disco_expiration", TimeUnit.HOURS); - - //noinspection NullableProblems - discoRequests = - CacheBuilder.newBuilder().expireAfterWrite( - discoExpirationHours, TimeUnit.HOURS).build( - new CacheLoader<String, RemoteDisco>() { - - @Override - public RemoteDisco load(String domain) throws Exception { - statDiscoStarted.get(domain).incrementAndGet(); - return new RemoteDisco(manager, domain, failExpirySecs, successExpirySecs); - } - }); - } - - /** - * Set the manager instance for this class. Must be invoked before any other - * methods are used. - * @param manager an XmppManager instance - */ - public void setManager(XmppManager manager) { - this.manager = manager; - } - - /** - * Handles a disco info get from a foreign source. A remote server is trying to ask us what we - * support. Send back a message identifying as a wave component. - * - * @param iq the IQ packet. - * @param responseCallback callback used to send response - */ - void processDiscoInfoGet(IQ iq, PacketCallback responseCallback) { - IQ response = IQ.createResultIQ(iq); - Element query = response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_INFO); - - query.addElement("identity") - .addAttribute("category", discoInfoCategory) - .addAttribute("type", discoInfoType) - .addAttribute("name", serverDescription); - - query.addElement("feature") - .addAttribute("var", XmppNamespace.NAMESPACE_WAVE_SERVER); - - responseCallback.run(response); - } - - - /** - * Handles a disco items get from a foreign XMPP agent. No useful responses, since we're not a - * domain on it's own: just the wave component. - * - * @param iq the IQ packet. - * @param responseCallback callback used to send response - */ - void processDiscoItemsGet(IQ iq, PacketCallback responseCallback) { - IQ response = IQ.createResultIQ(iq); - response.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS); - responseCallback.run(response); - } - - /** - * Attempt to discover the remote JID for this domain. Hands control to {@link RemoteDisco}. - * - * @param remoteDomain the domain to discover - * @param callback a callback to trigger when disco completes - */ - public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) { - Preconditions.checkNotNull("Must call setManager first", manager); - RemoteDisco disco = discoRequests.getIfPresent(remoteDomain); - if (disco != null) { - // This is a race condition, but we don't care if we lose it, because the ttl timestamp - // won't be exceeded in that case. - if (disco.ttlExceeded()) { - if (LOG.isLoggable(Level.FINE)) { - LOG.info("discoverRemoteJid for " + remoteDomain + ": result ttl exceeded."); - } - // TODO(arb): should we expose the disco cache somehow for debugging? - discoRequests.invalidate(remoteDomain); - } - } - try { - discoRequests.get(remoteDomain).discoverRemoteJID(callback); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } - - /** - * Inject a predetermined result into the disco results map. If the passed jid is null, generate - * an error/not-found case. - * - * @param domain remote domain - * @param jid remote JID - * @throws IllegalStateException if there is already a result for this domain - */ - @VisibleForTesting - void testInjectInDomainToJidMap(String domain, String jid) { - FederationError error = null; - if (jid == null) { - error = FederationErrors.badRequest("Fake injected error"); - } - RemoteDisco disco = discoRequests.getIfPresent(domain); - Preconditions.checkState(disco == null); - discoRequests.put(domain, new RemoteDisco(domain, jid, error)); - } - - /** - * Determine whether a request for the given domain is pending. - * - * @param domain remote domain - * @return true/false - */ - @VisibleForTesting - boolean isDiscoRequestPending(String domain) throws ExecutionException { - RemoteDisco disco = discoRequests.getIfPresent(domain); - return disco != null && disco.isRequestPending(); - } - - /** - * Determine whether the disco request for the given domain has been touched or is at all - * available. - * - * @param domain remote domain - * @return true/false - */ - @VisibleForTesting - boolean isDiscoRequestAvailable(String domain) { - return discoRequests.getIfPresent(domain) != null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java deleted file mode 100644 index 09c76d1..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHost.java +++ /dev/null @@ -1,446 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.inject.Inject; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.typesafe.config.Config; -import org.dom4j.Element; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.FederationHostBridge; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType; -import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException; -import org.waveprotocol.wave.model.id.WaveletName; -import org.xmpp.packet.IQ; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.logging.Logger; - -/** - * This class encapsulates the incoming packet processing portion of the - * Federation Host. Messages arrive on this class from a foreign Federation - * Remote for wavelets hosted by the local wave server. - */ -public class XmppFederationHost implements WaveletFederationListener.Factory { - @SuppressWarnings("unused") - private static final Logger LOG = Logger.getLogger(XmppFederationHost.class.getCanonicalName()); - - private final WaveletFederationProvider waveletProvider; - - private XmppManager manager = null; - - // A map of update listeners. There is one per remote domain we are sending updates to. - // The name 'listener' refers to them listening for updates from the waveserver to send to the - // network. - private final LoadingCache<String, WaveletFederationListener> listeners; - - /** - * Constructor. Note that {@link #setManager} must be called before this class - * is ready to use. - * - * @param waveletProvider used for communicating back to the Host part of the - * wavelet server. - * @param disco used for discovery - */ - @Inject - public XmppFederationHost(@FederationHostBridge WaveletFederationProvider waveletProvider, - final XmppDisco disco, final Config config) { - this.waveletProvider = waveletProvider; - listeners = CacheBuilder.newBuilder().build(new CacheLoader<String, WaveletFederationListener>() { - @Override - public WaveletFederationListener load(@SuppressWarnings("NullableProblems") String domain) { - return new XmppFederationHostForDomain(domain, manager, disco, config); - } - }); - } - - /** - * Set the manager instance for this class. Must be invoked before any other - * methods are used. - * @param manager the XmppManager object, used to send packets. - */ - public void setManager(XmppManager manager) { - this.manager = manager; - } - - /** - * Parse to a ProtocolHashedVersion from a given string version/base64-hash combination. - * - * @param startVersion the starting version - * @param base64Hash the base64 hash - * @throws IllegalArgumentException on bad data - * @return a parsed protobuf object - */ - private static ProtocolHashedVersion parseFromUnsafe(String startVersion, String base64Hash) - throws IllegalArgumentException { - return ProtocolHashedVersion.newBuilder() - .setVersion(Long.parseLong(startVersion)) - .setHistoryHash(Base64Util.decode(base64Hash)).build(); - } - - /** - * Reads a history request off the wire, sends it to the WS with a new - * callback for returning the response. - * @param request the history request - * @param responseCallback the callback to send the response back - */ - void processHistoryRequest(final IQ request, final PacketCallback responseCallback) { - Element items = null, historyDelta = null; - Element pubsubRequest = request.getElement().element("pubsub"); - if (pubsubRequest != null) { - items = pubsubRequest.element("items"); - if (items != null) { - historyDelta = items.element("delta-history"); - } - } - if (items == null || historyDelta == null - || historyDelta.attribute("start-version") == null - || historyDelta.attribute("start-version-hash") == null - || historyDelta.attribute("end-version") == null - || historyDelta.attribute("end-version-hash") == null - || historyDelta.attribute("wavelet-name") == null) { - responseCallback.error(FederationErrors.badRequest("Malformed history request")); - return; - } - - final ProtocolHashedVersion startVersion; - try { - startVersion = parseFromUnsafe(historyDelta.attributeValue("start-version"), - historyDelta.attributeValue("start-version-hash")); - } catch (IllegalArgumentException e) { - responseCallback.error(FederationErrors.badRequest("Invalid format of start version")); - return; - } - - final ProtocolHashedVersion endVersion; - try { - endVersion = parseFromUnsafe(historyDelta.attributeValue("end-version"), - historyDelta.attributeValue("end-version-hash")); - } catch (IllegalArgumentException e) { - responseCallback.error(FederationErrors.badRequest("Invalid format of end version")); - return; - } - - final long responseLengthLimit; - if (historyDelta.attribute("response-length-limit") != null) { - try { - responseLengthLimit = Long.parseLong(historyDelta.attributeValue("response-length-limit")); - } catch (NumberFormatException e) { - responseCallback.error(FederationErrors.badRequest("Invalid response length limit")); - return; - } - } else { - responseLengthLimit = 0; - } - - final WaveletName waveletName; - try { - waveletName = - XmppUtil.waveletNameCodec.uriToWaveletName(historyDelta.attributeValue("wavelet-name")); - } catch (EncodingException e) { - responseCallback.error(FederationErrors.badRequest( - "Malformed wavelet name: " + historyDelta.attributeValue("wavelet-name"))); - return; - } - - // Construct a new response listener inline. - WaveletFederationProvider.HistoryResponseListener listener = - new WaveletFederationProvider.HistoryResponseListener() { - @Override - public void onFailure(FederationError error) { - responseCallback.error(error); - } - - @Override - public void onSuccess(List<ByteString> appliedDeltaSet, - ProtocolHashedVersion lastCommittedVersion, long versionTruncatedAt) { - IQ response = IQ.createResultIQ(request); - - Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element items = pubsub.addElement("items"); - - // Add each delta to the outgoing response. - for (ByteString appliedDelta : appliedDeltaSet) { - items.addElement("item").addElement("applied-delta", - XmppNamespace.NAMESPACE_WAVE_SERVER).addCDATA( - Base64Util.encode(appliedDelta.toByteArray())); - } - - // Set the LCV history-hash, if provided. - // TODO(thorogood): We don't set the hashed version, which is wrong, - // but it's not part of the current spec (Feb 2010). - if (lastCommittedVersion != null && lastCommittedVersion.hasVersion()) { - String version = String.valueOf(lastCommittedVersion.getVersion()); - items.addElement("item").addElement("commit-notice", - XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version); - } - - // Set the version truncated at, if provided. - if (versionTruncatedAt > 0) { - String version = String.valueOf(versionTruncatedAt); - items.addElement("item").addElement("history-truncated", - XmppNamespace.NAMESPACE_WAVE_SERVER).addAttribute("version", version); - } - - // Send the message to the source. - responseCallback.run(response); - } - }; - - // Hand off a history request to the waveletProvider. - // TODO(thorogood,arb): Note that the following remote domain is going to be - // the Wave component JID (e.g. wave.foo.com), and *not* the actual remote domain. - String remoteDomain = request.getFrom().getDomain(); - waveletProvider.requestHistory(waveletName, remoteDomain, startVersion, - endVersion, responseLengthLimit, listener); - } - - /** - * Handles a submit request from a foreign wave remote. Sends it to the wave - * server, sets up a callback to send the response. - * @param request the submit request - * @param responseCallback the callback to send the response back - */ - void processSubmitRequest(final IQ request, final PacketCallback responseCallback) { - Element item = null, submitRequest = null, deltaElement = null; - Element pubsubRequest = request.getElement().element("pubsub"); - // TODO: check for correct elements. - Element publish = pubsubRequest.element("publish"); - if (publish != null) { - item = publish.element("item"); - if (item != null) { - submitRequest = item.element("submit-request"); - if (submitRequest != null) { - deltaElement = submitRequest.element("delta"); - } - } - } - if (publish == null || item == null || submitRequest == null - || deltaElement == null - || deltaElement.attribute("wavelet-name") == null - || deltaElement.getText() == null) { - responseCallback.error(FederationErrors.badRequest("Malformed submit request")); - return; - } - - final WaveletName waveletName; - try { - waveletName = - XmppUtil.waveletNameCodec.uriToWaveletName(deltaElement.attributeValue("wavelet-name")); - } catch (EncodingException e) { - responseCallback.error(FederationErrors.badRequest( - "Malformed wavelet name: " + deltaElement.attributeValue("wavelet-name"))); - return; - } - - final ProtocolSignedDelta delta; - try { - delta = ProtocolSignedDelta.parseFrom(Base64Util.decode(deltaElement.getText())); - } catch (InvalidProtocolBufferException e) { - responseCallback.error(FederationErrors.badRequest( - "Malformed delta, not a valid protocol buffer")); - return; - } - - // Construct a submit result listener inline. - WaveletFederationProvider.SubmitResultListener listener = - new WaveletFederationProvider.SubmitResultListener() { - @Override - public void onFailure(FederationError error) { - responseCallback.error(error); - } - - @Override - public void onSuccess(int operations, ProtocolHashedVersion version, long timestamp) { - IQ response = IQ.createResultIQ(request); - - Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element submitResponse = pubsub.addElement("publish").addElement("item") - .addElement("submit-response", XmppNamespace.NAMESPACE_WAVE_SERVER); - - submitResponse.addAttribute("application-timestamp", String.valueOf(timestamp)); - submitResponse.addAttribute("operations-applied", String.valueOf(operations)); - - Element hashedVersion = submitResponse.addElement("hashed-version"); - hashedVersion.addAttribute("history-hash", Base64Util.encode(version.getHistoryHash())); - hashedVersion.addAttribute("version", String.valueOf(version.getVersion())); - - responseCallback.run(response); - } - }; - - // Hand off the submit request to the wavelet provider. - waveletProvider.submitRequest(waveletName, delta, listener); - } - - /** - * Reads a get signer request off the wire, sends it to the WS with a new - * callback for returning the response. - * @param request the get signer request - * @param responseCallback the callback to send the response back - */ - void processGetSignerRequest(final IQ request, final PacketCallback responseCallback) { - Element items = request.getChildElement().element("items"); - Element signerRequest = items != null ? items.element("signer-request") : null; - - if (items == null || signerRequest == null - || signerRequest.attributeValue("wavelet-name") == null - || signerRequest.attributeValue("signer-id") == null - || signerRequest.attributeValue("version") == null - || signerRequest.attributeValue("history-hash") == null) { - manager.sendErrorResponse(request, FederationErrors.badRequest("Malformed signer request")); - return; - } - - final ByteString signerId; - try { - signerId = Base64Util.decode(signerRequest.attributeValue("signer-id")); - } catch (IllegalArgumentException e) { - responseCallback.error(FederationErrors.badRequest("Malformed signer ID")); - return; - } - - final ProtocolHashedVersion deltaEndVersion; - try { - deltaEndVersion = parseFromUnsafe(signerRequest.attributeValue("version"), - signerRequest.attributeValue("history-hash")); - } catch (IllegalArgumentException e) { - responseCallback.error(FederationErrors.badRequest("Invalid hashed version")); - return; - } - - final WaveletName waveletName; - try { - waveletName = - XmppUtil.waveletNameCodec.uriToWaveletName(signerRequest.attributeValue("wavelet-name")); - } catch (EncodingException e) { - responseCallback.error(FederationErrors.badRequest("Malformed wavelet name")); - return; - } - - WaveletFederationProvider.DeltaSignerInfoResponseListener listener = - new WaveletFederationProvider.DeltaSignerInfoResponseListener() { - @Override - public void onFailure(FederationError error) { - responseCallback.error(error); - } - - @Override - public void onSuccess(ProtocolSignerInfo signerInfo) { - IQ response = IQ.createResultIQ(request); - - Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element items = pubsub.addElement("items"); - XmppUtil.protocolSignerInfoToXml(signerInfo, items); - - responseCallback.run(response); - } - - }; - - waveletProvider.getDeltaSignerInfo(signerId, waveletName, deltaEndVersion, listener); - } - - /** - * Reads a post signer request off the wire, sends it to the WS with a new - * callback for returning the response. - * @param request the post signer request - * @param responseCallback the callback to send the response back - */ - void processPostSignerRequest(final IQ request, final PacketCallback responseCallback) { - Element item = null, signatureElement = null; - Element pubsubRequest = request.getElement().element("pubsub"); - Element publish = pubsubRequest.element("publish"); - if (publish != null) { - item = publish.element("item"); - if (item != null) { - signatureElement = item.element("signature"); - } - } - - if (publish == null || item == null || signatureElement == null - || signatureElement.attribute("domain") == null - || signatureElement.attribute("algorithm") == null - || signatureElement.element("certificate") == null) { - responseCallback.error(FederationErrors.badRequest("Malformed post signer request")); - return; - } - - ProtocolSignerInfo signer; - try { - signer = XmppUtil.xmlToProtocolSignerInfo(signatureElement); - } catch (UnknownSignerType e) { - responseCallback.error(FederationErrors.badRequest( - "Could not understand signer algorithm: " + e)); - return; - } - - WaveletFederationProvider.PostSignerInfoResponseListener listener = - new WaveletFederationProvider.PostSignerInfoResponseListener() { - @Override - public void onFailure(FederationError error) { - responseCallback.error(error); - } - - @Override - public void onSuccess() { - IQ response = IQ.createResultIQ(request); - - Element pubsub = response.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element item = pubsub.addElement("publish").addElement("item"); - - item.addAttribute("node", "signer"); - item.addElement("signature-response", XmppNamespace.NAMESPACE_WAVE_SERVER); - - responseCallback.run(response); - } - }; - - // TODO(thorogood,arb): This field is a Bad Idea; it could be faked and not - // be a provider we host on this instance. Instead, we should infer from the - // "To:" JID. - String targetDomain = signatureElement.attributeValue("domain"); - - // The first argument is the domain we intend to send this information to. - waveletProvider.postSignerInfo(targetDomain, signer, listener); - } - - @Override - public WaveletFederationListener listenerForDomain(String domain) { - try { - // TODO(thorogood): Kick off disco here instead of inside - // XmppFederationHostForDomain. - return listeners.get(domain); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java deleted file mode 100644 index d809b33..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationHostForDomain.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.inject.Inject; -import com.google.protobuf.ByteString; -import com.typesafe.config.Config; -import org.dom4j.Element; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException; -import org.waveprotocol.wave.model.id.WaveletName; -import org.xmpp.packet.Message; -import org.xmpp.packet.Packet; - -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * An instance of this class is created on demand for outgoing - * messages to another wave Federation Remote. The wave server asks - * the XmppFederationHost to create these. - */ -class XmppFederationHostForDomain implements WaveletFederationListener { - - private static final Logger LOG = - Logger.getLogger(XmppFederationHostForDomain.class.getCanonicalName()); - - // Timeout for outstanding listener updates sent over XMPP. - private static final int XMPP_LISTENER_TIMEOUT = 30; - - private final String remoteDomain; - private final XmppManager manager; - private final String jid; - private final XmppDisco disco; - - @Inject - public XmppFederationHostForDomain(final String domain, XmppManager manager, - XmppDisco disco, Config config) { - this.remoteDomain = domain; - this.manager = manager; - this.jid = config.getString("federation.xmpp_jid"); - this.disco = disco; - } - - @Override - public void waveletCommitUpdate(WaveletName waveletName, ProtocolHashedVersion committedVersion, - WaveletUpdateCallback callback) { - waveletUpdate(waveletName, null, committedVersion, callback); - } - - @Override - public void waveletDeltaUpdate(WaveletName waveletName, List<ByteString> appliedDeltas, - WaveletUpdateCallback callback) { - waveletUpdate(waveletName, appliedDeltas, null, callback); - } - - /** - * Sends a wavelet update message on behalf of the wave server. This - * method just triggers a disco lookup (which may be cached) and - * sets up a callback to call the real method that does the work. - * This method may contain applied deltas, a commit notice, or both. - * - * @param waveletName the wavelet name - * @param deltaList the deltas to include in the message, or null - * @param committedVersion last committed version to include, or null - * @param callback callback to invoke on delivery success/failure - */ - public void waveletUpdate(final WaveletName waveletName, final List<ByteString> deltaList, - final ProtocolHashedVersion committedVersion, final WaveletUpdateCallback callback) { - if ((deltaList == null || deltaList.isEmpty()) && committedVersion == null) { - throw new IllegalArgumentException("Must send at least one delta, or a last committed " + - "version notice, for the target wavelet: " + waveletName); - } - - disco.discoverRemoteJid(remoteDomain, new SuccessFailCallback<String, String>() { - @Override - public void onSuccess(String remoteJid) { - internalWaveletUpdate(waveletName, deltaList, committedVersion, callback, remoteJid); - } - - @Override - public void onFailure(String errorMessage) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("Disco failed for remote domain " + remoteDomain + ", update not sent"); - } - callback.onFailure(FederationErrors.newFederationError( - FederationError.Code.RESOURCE_CONSTRAINT, errorMessage)); - } - }); - } - - /** - * Sends a wavelet update message on behalf of the wave server once disco is - * complete. This method may contain applied deltas, a commit notice, or both. - * - * @param waveletName the wavelet name - * @param deltaList the deltas to include in the message, or null - * @param committedVersion last committed version to include, or null - * @param callback callback to invoke on delivery success/failure - * @param remoteJid the remote JID to send the update to - */ - private void internalWaveletUpdate(final WaveletName waveletName, - final List<ByteString> deltaList, final ProtocolHashedVersion committedVersion, - final WaveletUpdateCallback callback, String remoteJid) { - Message message = new Message(); - message.setType(Message.Type.normal); - message.setFrom(jid); - message.setTo(remoteJid); - message.setID(XmppUtil.generateUniqueId()); - message.addChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS); - - final String encodedWaveletName; - try { - encodedWaveletName = XmppUtil.waveletNameCodec.waveletNameToURI(waveletName); - } catch (EncodingException e) { - callback.onFailure(FederationErrors.badRequest("Bad wavelet name " + waveletName)); - return; - } - - Element itemElement = message.addChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT) - .addElement("items").addElement("item"); - if (deltaList != null) { - for (ByteString delta : deltaList) { - Element waveletUpdate = - itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER) - .addAttribute("wavelet-name", encodedWaveletName); - waveletUpdate.addElement("applied-delta").addCDATA(Base64Util.encode(delta.toByteArray())); - } - } - if (committedVersion != null) { - Element waveletUpdate = - itemElement.addElement("wavelet-update", XmppNamespace.NAMESPACE_WAVE_SERVER) - .addAttribute("wavelet-name", encodedWaveletName); - waveletUpdate.addElement("commit-notice").addAttribute("version", - Long.toString(committedVersion.getVersion())).addAttribute("history-hash", - Base64Util.encode(committedVersion.getHistoryHash())); - } - - // Send the generated message through to the foreign XMPP server. - manager.send(message, new PacketCallback() { - @Override - public void error(FederationError error) { - callback.onFailure(error); - } - - @Override - public void run(Packet packet) { - callback.onSuccess(); - } - }, XMPP_LISTENER_TIMEOUT); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java deleted file mode 100644 index 0be6e94..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationModule.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; - -import org.waveprotocol.wave.federation.FederationHostBridge; -import org.waveprotocol.wave.federation.FederationRemoteBridge; -import org.waveprotocol.wave.federation.FederationTransport; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.federation.WaveletFederationProvider; - -/** - * Module for setting up an XMPP federation subsystem - * - * @author [email protected] (Tad Glines) - */ -public class XmppFederationModule extends AbstractModule { - - @Override - protected void configure() { - // Request history and submit deltas to the outside world *from* our local - // Wave Server. - bind(WaveletFederationProvider.class).annotatedWith(FederationRemoteBridge.class).to( - XmppFederationRemote.class).in(Singleton.class); - - // Serve updates to the outside world about local waves. - bind(WaveletFederationListener.Factory.class).annotatedWith(FederationHostBridge.class).to( - XmppFederationHost.class).in(Singleton.class); - - bind(XmppDisco.class).in(Singleton.class); - bind(XmppFederationRemote.class).in(Singleton.class); - bind(XmppFederationHost.class).in(Singleton.class); - - bind(XmppManager.class).in(Singleton.class); - bind(IncomingPacketHandler.class).to(XmppManager.class); - bind(ComponentPacketTransport.class).in(Singleton.class); - bind(OutgoingPacketTransport.class).to(ComponentPacketTransport.class); - - bind(FederationTransport.class).to(XmppFederationTransport.class).in(Singleton.class); - } - -}
