Author: sebawagner
Date: Sun Nov 18 08:46:33 2012
New Revision: 1410852

URL: http://svn.apache.org/viewvc?rev=1410852&view=rev
Log:
OPENMEETINGS-460 remove "pingRunning" into RestClient, implement first version 
of scheduler to sync sessions to master.

Added:
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
Removed:
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/SlaveObserver.java
Modified:
    
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
    
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
    
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java

Modified: 
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
 Sun Nov 18 08:46:33 2012
@@ -90,14 +90,11 @@
        <bean id="openmeetings.FlvRecorderConverter"
                
class="org.apache.openmeetings.data.flvrecord.converter.FlvRecorderConverter" />
                
-       <bean id="openmeetings.SlaveObserver"
-               class="org.apache.openmeetings.cluster.sync.SlaveObserver" />
-               
        <!-- 
        #####################################################################
        Comment this in to enable the OpenMeetings instance to act as Master
        #####################################################################
-               
+        -->
                
        <bean id="openmeetings.ClusterSlaveJob" 
class="org.apache.openmeetings.quartz.scheduler.ClusterSlaveJob" />
        <bean id="clusterSlaveJob"
@@ -128,7 +125,7 @@
                        </list>
                </property>
        </bean>
-        -->
+       
                
        <bean id="openmeetings.SessionClearJob" 
class="org.apache.openmeetings.quartz.scheduler.SessionClearJob" />
        <bean id="sessionClearJob"

Modified: 
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
 Sun Nov 18 08:46:33 2012
@@ -38,7 +38,6 @@
                        this.serverName.setAttribute('text', '');
                        this.ip.setAttribute('text', '');
                        this.lastPing.setAttribute('text','');
-                       this.pingRunning.setAttribute('text','');
                        this.comment.setAttribute('text','');
                        this.port.setAttribute('text','');
                        this.user.setAttribute('text','');
@@ -68,7 +67,6 @@
                        this.ip.setAttribute('text', server.address);
                        this.lastPing.setAttribute('text', server.lastPing);
                        this.comment.setAttribute('text', server.comment);
-                       this.pingRunning.setAttribute('text', 
server.pingRunning);
                        this.comment.setAttribute('text', server.comment);
                        this.port.setAttribute('text', server.port);
                        this.user.setAttribute('text', server.user);
@@ -145,11 +143,8 @@
         <labelText labelid="1517" width="200" y="360" resize="false" x="2"/>
                <labelText name="lastPing" y="360" x="120" width="270" />
                
-        <labelText labelid="1524" width="200" y="380" resize="false" x="2"/>
-               <labelText name="pingRunning" y="380" x="120" width="270" />
-           
-           <labelText labelid="270" width="200" y="400" resize="false" x="2"/>
-               <customScrollEdittext name="comment" y="400" x="120" 
width="270" 
+           <labelText labelid="270" width="200" y="380" resize="false" x="2"/>
+               <customScrollEdittext name="comment" y="380" x="120" 
width="270" 
                    height="100" text="" />    
                
        </class>

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
 Sun Nov 18 08:46:33 2012
@@ -122,20 +122,22 @@ public class ServerWebService {
         *            - webapp name of the OpenMeetings instance
         * @param protocol
         *            - protocol to access the OpenMeetings instance
+        * @param active
+        *            - if the server currently participates in the cluster or 
not
         * @param comment
         *            - comment for the server
         * @return the id of saved server
         */
        public long saveServer(String SID, long id, String name, String address,
                        int port, String user, String pass, String webapp, 
String protocol,
-                       String comment) throws AxisFault {
+                       Boolean active, String comment) throws AxisFault {
                log.debug("saveServerCount enter");
                Long users_id = sessionManagement.checkSession(SID);
                Long user_level = userManagement.getUserLevelByID(users_id);
 
                if (authLevelManagement.checkWebServiceLevel(user_level)) {
                        return serversDao.saveServer(id, name, address, port, 
user, pass,
-                                       webapp, protocol, comment, users_id)
+                                       webapp, protocol, active, comment, 
users_id)
                                        .getId();
                } else {
                        log.warn("Insuffisient permissions");

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
 Sun Nov 18 08:46:33 2012
@@ -72,13 +72,13 @@ public class ServerWebServiceFacade {
 
        /**
         * Proxy method please see
-        * {@link ServerWebService#saveServer(String, long, String, String, 
int, String, String, String, String, String)}
+        * {@link ServerWebService#saveServer(String, long, String, String, 
int, String, String, String, String, Boolean, String)}
         */
        public long saveServer(String SID, long id, String name, String address,
                        int port, String user, String pass, String webapp, 
String protocol,
-                       String comment) throws AxisFault {
+                       Boolean active, String comment) throws AxisFault {
                return getServerServiceProxy().saveServer(SID, id, name, 
address, port,
-                               user, pass, webapp, protocol, comment);
+                               user, pass, webapp, protocol, active, comment);
        }
 
        /**

Added: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java?rev=1410852&view=auto
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
 (added)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
 Sun Nov 18 08:46:33 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.cluster.sync;
+
+import java.util.List;
+
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.persistence.beans.basic.Server;
+
+/**
+ * Defines the events the {@link RestClient} will broadcast when he performs 
calls
+ * to the {@link Server}
+ * 
+ * @author sebawagner
+ * 
+ */
+public interface IRestClientObserver {
+
+       /**
+        * performed by the RestClient, whenever a ping is completed and the
+        * response is parsed to a session object ready to be injected in the 
local
+        * session store
+        * 
+        * @param server
+        * @param slaveClients
+        */
+       public void pingComplete(Server server, List<SlaveClientDto> 
slaveClients);
+
+}

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
 Sun Nov 18 08:46:33 2012
@@ -34,17 +34,30 @@ import org.apache.axis2.client.Options;
 import org.apache.axis2.client.ServiceClient;
 import org.apache.openmeetings.OpenmeetingsVariables;
 import org.apache.openmeetings.conference.room.RoomClient;
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.persistence.beans.basic.Server;
 import org.red5.logging.Red5LoggerFactory;
 import org.slf4j.Logger;
 
+/**
+ * 
+ * Performs call to the WebService Gateway to load the server load from the
+ * slave's of the cluster
+ * 
+ * @author sebawagner
+ * 
+ */
 public class RestClient {
 
        private static final Logger log = Red5LoggerFactory.getLogger(
                        RestClient.class, OpenmeetingsVariables.webAppRootKey);
 
-       private boolean loginSuccess = false;
-       private String sessionId;
-
+       /**
+        * The observerInstance will be notified whenever a ping was completed
+        */
+       private IRestClientObserver observerInstance;
+       
+       private Server server;
        private final String host;
        private final int port;
        private final String protocol;
@@ -52,6 +65,22 @@ public class RestClient {
        private final String user;
        private final String pass;
 
+       private boolean loginSuccess = false;
+       private String sessionId;
+       
+       private boolean pingRunning = false;
+       
+       /**
+        * returns true as long as the RestClient performs a ping and parses 
the result
+        * 
+        * @return
+        */
+       public boolean getPingRunning() {
+               return pingRunning;
+       }
+
+       private static String NAMESPACE_PREFIX = 
"http://services.axis.openmeetings.apache.org";;
+
        private String getUserServiceEndPoint() {
                return protocol + "://" + host + ":" + port + "/" + webapp
                                + "/services/UserService";
@@ -73,9 +102,44 @@ public class RestClient {
                }
        }
 
-       public RestClient(String host, int port, String protocol, String webapp,
+       /**
+        * The observerInstance will be notified whenever a ping was completed
+        * 
+        * @param observerInstance
+        * @param host
+        * @param port
+        * @param protocol
+        * @param webapp
+        * @param user
+        * @param pass
+        */
+       public RestClient(IRestClientObserver observerInstance, Server server) {
+               this.observerInstance = observerInstance;
+               this.server = server;
+               this.host = server.getAddress();
+               this.port = server.getPort();
+               this.protocol = server.getProtocol();
+               this.webapp = server.getWebapp();
+               this.user = server.getUser();
+               this.pass = server.getPass();
+       }
+
+       /**
+        * for simple testing this method provides a version of the constructor
+        * without need for a server entity.<br/>
+        * <br/>
+        * There is no spring or JPA enhanced class in use here. This Object is
+        * stored in session/memory
+        * 
+        * @param host
+        * @param port
+        * @param protocol
+        * @param webapp
+        * @param user
+        * @param pass
+        */
+       private RestClient(String host, int port, String protocol, String 
webapp,
                        String user, String pass) {
-               super();
                this.host = host;
                this.port = port;
                this.protocol = protocol;
@@ -85,14 +149,44 @@ public class RestClient {
        }
 
        /**
+        * compare if the details here and the one stored are still the same
+        * 
+        * @param server2
+        * @return
+        */
+       public boolean hasServerDetailsChanged(Server server2) {
+
+               if (!host.equals(server2.getAddress())) {
+                       return true;
+               }
+               if (port != server2.getPort()) {
+                       return true;
+               }
+               if (!user.equals(server2.getUser())) {
+                       return true;
+               }
+               if (!pass.equals(server2.getPass())) {
+                       return true;
+               }
+               if (!webapp.equals(server2.getWebapp())) {
+                       return true;
+               }
+               if (!protocol.equals(server2.getProtocol())) {
+                       return true;
+               }
+
+               return false;
+       }
+
+       /**
         * Login the user via REST
         * 
         * @throws Exception
         */
        public void loginUser() throws Exception {
+               
                Options options = new Options();
                options.setTo(new EndpointReference(getUserServiceEndPoint()));
-
                options.setProperty(Constants.Configuration.ENABLE_REST,
                                Constants.VALUE_TRUE);
 
@@ -103,16 +197,13 @@ public class RestClient {
                OMElement getSessionResult = sender
                                .sendReceive(getPayloadMethodGetSession());
                sessionId = getSessionIdFromResult(getSessionResult);
-               log.debug("sessionId:: " + sessionId);
 
                OMElement loginUserResult = sender
                                .sendReceive(getPayloadMethodLoginUser());
 
-               log.debug("loginUserResult:: " + loginUserResult);
-
                loginSuccess = loginSuccessFromResult(loginUserResult);
 
-               log.debug("loginSuccess:: " + loginSuccess);
+               ping();
 
        }
 
@@ -122,27 +213,44 @@ public class RestClient {
         * 
         * @throws Exception
         */
-       public void ping() throws Exception {
-               if (!loginSuccess) {
-                       loginUser();
-               } else {
-                       Options options = new Options();
-                       options.setTo(new 
EndpointReference(getServerServiceEndPoint()));
-
-                       options.setProperty(Constants.Configuration.ENABLE_REST,
-                                       Constants.VALUE_TRUE);
-
-                       ServiceClient sender = new ServiceClient();
-                       sender.engageModule(new 
QName(Constants.MODULE_ADDRESSING)
-                                       .getLocalPart());
-                       sender.setOptions(options);
-
-                       OMElement pingResult = sender
-                                       
.sendReceive(getPayloadMethodPingTemp());
-
-                       log.debug("pingResult:: " + pingResult);
+       public void ping() {
+               try {
+                       //flag this flow as active
+                       pingRunning = true;
+                       
+                       if (!loginSuccess) {
+                               loginUser();
+                       } else {
+                               
+                               Options options = new Options();
+                               options.setTo(new 
EndpointReference(getServerServiceEndPoint()));
+                               
options.setProperty(Constants.Configuration.ENABLE_REST,
+                                               Constants.VALUE_TRUE);
+
+                               ServiceClient sender = new ServiceClient();
+                               sender.engageModule(new 
QName(Constants.MODULE_ADDRESSING)
+                                               .getLocalPart());
+                               sender.setOptions(options);
+
+                               OMElement pingResult = sender
+                                               
.sendReceive(getPayloadMethodPingTemp());
+
+                               List<SlaveClientDto> slaveClients = 
pingFromResult(pingResult);
+
+                               if (this.observerInstance != null) {
+                                       
this.observerInstance.pingComplete(server, slaveClients);
+                               }
+                               
+                               //flag this flow as complete
+                               pingRunning = false;
 
-                       pingFromResult(pingResult);
+                       }
+                       // Catches all errors to make sure the observer is 
notified that the
+                       // ping was performed (even when performed badly)
+               } catch (Exception ex) {
+                       //flag this flow as complete
+                       pingRunning = false;
+                       log.error("[ping failed]", ex);
                }
        }
 
@@ -151,10 +259,9 @@ public class RestClient {
         * 
         * @return
         */
-       private OMElement getPayloadMethodGetSession() {
+       private OMElement getPayloadMethodGetSession() throws Exception {
                OMFactory fac = OMAbstractFactory.getOMFactory();
-               OMNamespace omNs = fac.createOMNamespace(
-                               "http://services.axis.openmeetings.apache.org";, 
"pre");
+               OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX, 
"pre");
                OMElement method = fac.createOMElement("getSession", omNs);
                return method;
        }
@@ -185,10 +292,9 @@ public class RestClient {
         * 
         * @return
         */
-       private OMElement getPayloadMethodLoginUser() {
+       private OMElement getPayloadMethodLoginUser() throws Exception {
                OMFactory fac = OMAbstractFactory.getOMFactory();
-               OMNamespace omNs = fac.createOMNamespace(
-                               "http://services.axis.openmeetings.apache.org";, 
"pre");
+               OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX, 
"pre");
                OMElement method = fac.createOMElement("loginUser", omNs);
 
                OMElement sid = fac.createOMElement("SID", omNs);
@@ -215,8 +321,7 @@ public class RestClient {
         */
        private boolean loginSuccessFromResult(OMElement result) throws 
Exception {
 
-               QName loginResult = new QName(
-                               "http://services.axis.openmeetings.apache.org";, 
"return");
+               QName loginResult = new QName(NAMESPACE_PREFIX, "return");
 
                @SuppressWarnings("unchecked")
                Iterator<OMElement> elements = 
result.getChildrenWithName(loginResult);
@@ -226,7 +331,7 @@ public class RestClient {
                                return true;
                        } else {
                                throw new Exception("Could not login user at, 
error code is: "
-                                                               + 
resultElement.getText());
+                                               + resultElement.getText());
                        }
                } else {
                        throw new Exception("Could not parse login result");
@@ -239,10 +344,9 @@ public class RestClient {
         * 
         * @return
         */
-       private OMElement getPayloadMethodPingTemp() {
+       private OMElement getPayloadMethodPingTemp() throws Exception {
                OMFactory fac = OMAbstractFactory.getOMFactory();
-               OMNamespace omNs = fac.createOMNamespace(
-                               "http://services.axis.openmeetings.apache.org";, 
"pre");
+               OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX, 
"pre");
                OMElement method = fac.createOMElement("ping", omNs);
 
                OMElement sid = fac.createOMElement("SID", omNs);
@@ -261,24 +365,40 @@ public class RestClient {
         * @return list of {@link RoomClient}s
         * @throws Exception
         */
-       private List<RoomClient> pingFromResult(OMElement result) throws 
Exception {
+       private List<SlaveClientDto> pingFromResult(OMElement result) throws 
Exception {
 
-               QName pingResult = new QName(
-                               "http://services.axis.openmeetings.apache.org";, 
"return");
+               QName pingResult = new QName(NAMESPACE_PREFIX, "return");
                String nameSpaceForSlaveDto = 
"http://room.conference.openmeetings.apache.org/xsd";;
 
                @SuppressWarnings("unchecked")
                Iterator<OMElement> elements = 
result.getChildrenWithName(pingResult);
-               List<RoomClient> clients = new ArrayList<RoomClient>();
+               List<SlaveClientDto> clients = new ArrayList<SlaveClientDto>();
                while (elements.hasNext()) {
                        OMElement resultElement = elements.next();
-                       clients.add(new RoomClient( //
-                               resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "streamid")).getText(), //
-                               resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "publicSID")).getText(), //
-                               
Long.valueOf(resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "roomId")).getText()).longValue(), //
-                               
Long.valueOf(resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "userId")).getText()).longValue(), //
-                               resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "firstName")).getText(), //
-                               resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "lastName")).getText()) //
+
+                       Long roomId = null;
+                       String roomIdAsXmlString = 
resultElement.getFirstChildWithName(
+                                       new QName(nameSpaceForSlaveDto, 
"roomId")).getText();
+                       if (roomIdAsXmlString != null && 
roomIdAsXmlString.length() > 0) {
+                               roomId = 
Long.valueOf(roomIdAsXmlString).longValue();
+                       }
+
+                       Long userId = null;
+                       String userIdAsXmlString = 
resultElement.getFirstChildWithName(
+                                       new QName(nameSpaceForSlaveDto, 
"userId")).getText();
+                       if (userIdAsXmlString != null && 
userIdAsXmlString.length() > 0) {
+                               userId = 
Long.valueOf(userIdAsXmlString).longValue();
+                       }
+
+                       clients.add(new SlaveClientDto( //
+                                       resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "streamid")).getText(), //
+                                       resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "publicSID")).getText(), //
+                                       roomId, //
+                                       userId, //
+                                       resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "firstName")).getText(), //
+                                       resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto, "lastName")).getText(), //
+                                       
Boolean.valueOf(resultElement.getFirstChildWithName(new 
QName(nameSpaceForSlaveDto,"isAVClient")).getText()).booleanValue() //
+                       ) //
                        );
                }
                return clients;

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
 Sun Nov 18 08:46:33 2012
@@ -515,29 +515,34 @@ public class ClientListHashMapStore impl
                for (Iterator<Entry<String, ClientSession>> iter = clientList
                                .entrySet().iterator(); iter.hasNext();) {
                        Entry<String, ClientSession> entry = iter.next();
-                       if (entry.getValue().getServer().equals(server)) {
+                       if (entry.getValue().getServer() != null
+                                       && 
entry.getValue().getServer().equals(server)) {
                                iter.remove();
                        }
                }
 
-               System.err.println("Session 2 Length: " + clientList.size());
+               log.debug("Session 2 Length: " + clientList.size());
 
                for (SlaveClientDto slaveClientDto : clients) {
                        String uniqueKey = 
ClientSessionUtil.getClientSessionKey(null,
                                        slaveClientDto.getStreamid());
-                       clientList.put(uniqueKey, new ClientSession(server, new 
RoomClient(
-                                       slaveClientDto.getStreamid(),
-                                       slaveClientDto.getPublicSID(), 
slaveClientDto.getRoomId(),
-                                       slaveClientDto.getUserId(), 
slaveClientDto.getFirstName(),
-                                       slaveClientDto.getLastName())));
+                       clientList.put(
+                                       uniqueKey,
+                                       new ClientSession(server, new 
RoomClient(slaveClientDto
+                                                       .getStreamid(), 
slaveClientDto.getPublicSID(),
+                                                       
slaveClientDto.getRoomId(), slaveClientDto
+                                                                       
.getUserId(),
+                                                       
slaveClientDto.getFirstName(), slaveClientDto
+                                                                       
.getLastName(), slaveClientDto
+                                                                       
.getIsAVClient())));
 
                }
-               
-               System.err.println("Session 3 Length: " + clientList.size());
+
+               log.debug("Session 3 Length: " + clientList.size());
 
                for (ClientSession cSession : clientList.values()) {
-                       System.err.println("cSession: " + cSession.getServer()
-                                       + " cSession RCL " + 
cSession.getRoomClient());
+                       log.warn("cSession: " + cSession.getServer() + " 
cSession RCL "
+                                       + cSession.getRoomClient());
                }
        }
 

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
 Sun Nov 18 08:46:33 2012
@@ -183,7 +183,7 @@ public class RoomClient implements Seria
     }
     
        public RoomClient(String streamid, String publicSID, Long room_id,
-                       Long user_id, String firstname, String lastname) {
+                       Long user_id, String firstname, String lastname, 
boolean isAVClient) {
                super();
                this.streamid = streamid;
                this.publicSID = publicSID;
@@ -191,6 +191,7 @@ public class RoomClient implements Seria
                this.user_id = user_id;
                this.firstname = firstname;
                this.lastname = lastname;
+               this.isAVClient = isAVClient;
        }
 
        public void setUserObject(Long user_id, String username, String 
firstname, String lastname) {

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
 Sun Nov 18 08:46:33 2012
@@ -34,18 +34,8 @@ public class SlaveClientDto {
        private String lastName;
        private Long userId;
        private Long roomId;
+       private boolean isAVClient = false;
        
-       public SlaveClientDto(String streamid, String publicSID, String 
firstName,
-                       String lastName, Long userId, Long roomId) {
-               super();
-               this.streamid = streamid;
-               this.publicSID = publicSID;
-               this.firstName = firstName;
-               this.lastName = lastName;
-               this.userId = userId;
-               this.roomId = roomId;
-       }
-
        public SlaveClientDto(RoomClient roomClient) {
                this.streamid = roomClient.getStreamid();
                this.publicSID = roomClient.getPublicSID();
@@ -53,6 +43,18 @@ public class SlaveClientDto {
                this.lastName = roomClient.getLastname();
                this.userId = roomClient.getUser_id();
                this.roomId = roomClient.getRoom_id();
+               this.isAVClient = roomClient.getIsAVClient();
+       }
+
+       public SlaveClientDto(String streamid, String publicSID, Long roomId2,
+                       Long userId2, String firstName, String lastName, 
boolean isAVClient) {
+               this.streamid = streamid;
+               this.publicSID = publicSID;
+               this.firstName = firstName;
+               this.lastName = lastName;
+               this.userId = userId2;
+               this.roomId = roomId2;
+               this.isAVClient = isAVClient;
        }
 
        public String getStreamid() {
@@ -103,4 +105,12 @@ public class SlaveClientDto {
                this.publicSID = publicSID;
        }
 
+       public boolean getIsAVClient() {
+               return isAVClient;
+       }
+
+       public void setIsAVClient(boolean isAVClient) {
+               this.isAVClient = isAVClient;
+       }
+
 }

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
 Sun Nov 18 08:46:33 2012
@@ -41,18 +41,18 @@ import org.springframework.transaction.a
  * 
  * CRUD for {@link Server}
  * 
- * @author solomax, swagner
+ * @author solomax, sebawagner
  * 
  */
 @Transactional
 public class ServerDao implements IDataProviderDao<Server> {
        private static final Logger log = Red5LoggerFactory.getLogger(
                        ServerDao.class, OpenmeetingsVariables.webAppRootKey);
-       public final static String[] searchFields = {"name", "address", 
"comment"};
+       public final static String[] searchFields = { "name", "address", 
"comment" };
 
        @PersistenceContext
        private EntityManager em;
-       
+
        @Autowired
        private UsersDao usersDao;
 
@@ -74,22 +74,33 @@ public class ServerDao implements IDataP
         * @see org.apache.openmeetings.data.OmDAO#get(int, int)
         */
        public List<Server> get(int start, int max) {
-               log.debug("getServerList enter");
                TypedQuery<Server> q = em.createNamedQuery("getAllServers",
                                Server.class);
                q.setFirstResult(start);
                q.setMaxResults(max);
-
                return q.getResultList();
        }
-       
+
        public List<Server> get(String search, int start, int count, String 
order) {
-               TypedQuery<Server> q = 
em.createQuery(DaoHelper.getSearchQuery("Server", "s", search, true, false, 
order, searchFields), Server.class);
+               TypedQuery<Server> q = em.createQuery(DaoHelper.getSearchQuery(
+                               "Server", "s", search, true, false, order, 
searchFields),
+                               Server.class);
                q.setFirstResult(start);
                q.setMaxResults(count);
                return q.getResultList();
        }
-       
+
+       /**
+        * get the list of all servers in the cluster that are ready to receive 
a
+        * ping (active = true)
+        * 
+        * @return
+        */
+       public List<Server> getSlavesForPing() {
+               return em.createNamedQuery("getSlavesForPing", Server.class)
+                               .getResultList();
+       }
+
        /*
         * (non-Javadoc)
         * 
@@ -103,10 +114,11 @@ public class ServerDao implements IDataP
        }
 
        public long count(String search) {
-               TypedQuery<Long> q = 
em.createQuery(DaoHelper.getSearchQuery("Server", "s", search, true, true, 
null, searchFields), Long.class);
+               TypedQuery<Long> q = 
em.createQuery(DaoHelper.getSearchQuery("Server",
+                               "s", search, true, true, null, searchFields), 
Long.class);
                return q.getSingleResult();
        }
-       
+
        /*
         * (non-Javadoc)
         * 
@@ -115,7 +127,8 @@ public class ServerDao implements IDataP
        public Server get(long id) {
                Server result = null;
                log.debug("getServer enter, id = " + id);
-               TypedQuery<Server> q = em.createNamedQuery("getServerById", 
Server.class);
+               TypedQuery<Server> q = em.createNamedQuery("getServerById",
+                               Server.class);
                q.setParameter("id", id);
                try {
                        result = q.getSingleResult();
@@ -133,38 +146,43 @@ public class ServerDao implements IDataP
         */
        public Server getServerByAddress(String address) {
                log.debug("getServer enter, address = " + address);
-               TypedQuery<Server> q = 
em.createNamedQuery("getServerByAddress", Server.class);
+               TypedQuery<Server> q = em.createNamedQuery("getServerByAddress",
+                               Server.class);
                q.setParameter("address", address);
                List<Server> list = q.getResultList();
                return list.size() > 0 ? list.get(0) : null;
        }
 
        /**
-        * This method is necessary to automatically assign user to the server 
with minimum load.
+        * This method is necessary to automatically assign user to the server 
with
+        * minimum load.
         * 
-        * First of all we are trying to find servers referenced by 0 users.
-        * If all servers are referenced by at least 1 user we are searching 
the first server referenced by minimum users.
+        * First of all we are trying to find servers referenced by 0 users. If 
all
+        * servers are referenced by at least 1 user we are searching the first
+        * server referenced by minimum users.
         * 
-        * @return Server object referenced by the minimum user accounts. 
+        * @return Server object referenced by the minimum user accounts.
         */
        public Server getServerWithMinimumUsers() {
                Server result = null;
                log.debug("getServerWithMinimumUsers enter");
-               TypedQuery<Server> q = 
em.createNamedQuery("getServersWithNoUsers", Server.class);
+               TypedQuery<Server> q = 
em.createNamedQuery("getServersWithNoUsers",
+                               Server.class);
                List<Server> l = q.getResultList();
                if (l.isEmpty()) {
-                       TypedQuery<Object> q1 = 
em.createNamedQuery("getServerWithMinimumUsers", Object.class);
+                       TypedQuery<Object> q1 = em.createNamedQuery(
+                                       "getServerWithMinimumUsers", 
Object.class);
                        List<Object> r = q1.getResultList();
                        if (!r.isEmpty()) {
                                // get server id from first line
-                               result = get((Long)((Object[])r.get(0))[0]);
+                               result = get((Long) ((Object[]) r.get(0))[0]);
                        }
                } else {
                        result = l.get(0);
                }
                return result;
        }
-       
+
        /**
         * @deprecated user standard mechanism of
         *             {@link 
IDataProviderDao#update(org.apache.openmeetings.persistence.beans.OmEntity, 
long)}
@@ -174,10 +192,9 @@ public class ServerDao implements IDataP
         * @return
         */
        @Deprecated
-       public Server saveServer(long id, String name, String address,
- int port,
+       public Server saveServer(long id, String name, String address, int port,
                        String user, String pass, String webapp, String 
protocol,
-                       String comment, long userId) {
+                       Boolean active, String comment, long userId) {
                Server s = get(id);
                if (s == null) {
                        s = new Server();
@@ -194,6 +211,7 @@ public class ServerDao implements IDataP
                s.setPass(pass);
                s.setWebapp(webapp);
                s.setProtocol(protocol);
+               s.setActive(active);
                s.setComment(comment);
 
                return em.merge(s);
@@ -243,6 +261,11 @@ public class ServerDao implements IDataP
                return entity;
        }
 
+       public Server update(Server entity) {
+               em.merge(entity);
+               return entity;
+       }
+
        /*
         * (non-Javadoc)
         * 
@@ -258,7 +281,7 @@ public class ServerDao implements IDataP
                        em.merge(entity);
                }
        }
-       
+
        /**
         * get {@link Server} by name
         * 
@@ -271,5 +294,5 @@ public class ServerDao implements IDataP
                q.setParameter("name", name);
                return q.getResultList();
        }
-       
+
 }

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
 Sun Nov 18 08:46:33 2012
@@ -48,7 +48,9 @@ import org.simpleframework.xml.Root;
                @NamedQuery(name = "getServerByName", query = "SELECT s FROM 
Server s WHERE s.deleted = false AND s.name LIKE :name"),
                @NamedQuery(name = "getServerByAddress", query = "SELECT s FROM 
Server s WHERE s.deleted = false AND s.address LIKE :address"),
                @NamedQuery(name = "getServersWithNoUsers", query = "SELECT s 
FROM Server s WHERE s.deleted = false AND s.id NOT IN (SELECT u.server.id FROM 
Users u where u.server.id IS NOT NULL)"),
-               @NamedQuery(name = "getServerWithMinimumUsers", query = "SELECT 
s.id, COUNT(u) AS cnt FROM Users u JOIN u.server s WHERE s.deleted = false 
GROUP BY s.id ORDER BY cnt") })
+               @NamedQuery(name = "getServerWithMinimumUsers", query = "SELECT 
s.id, COUNT(u) AS cnt FROM Users u JOIN u.server s WHERE s.deleted = false 
GROUP BY s.id ORDER BY cnt"),
+               @NamedQuery(name = "getSlavesForPing", query = "SELECT s FROM 
Server s WHERE s.deleted = false AND s.active = true"), //
+})
 @Table(name = "server")
 @Root
 public class Server implements Serializable, IDataProviderEntity {
@@ -92,10 +94,6 @@ public class Server implements Serializa
        @Element(data = true, required = false)
        private Calendar lastPing;
        
-       @Column(name = "ping_running", nullable = true)
-       @Element(data = true, required = false)
-       private boolean pingRunning;
-
        @Column(name = "port", nullable = true)
        @Element(data = true, required = false)
        private int port;
@@ -204,14 +202,6 @@ public class Server implements Serializa
                this.lastPing = lastPing;
        }
 
-       public boolean isPingRunning() {
-               return pingRunning;
-       }
-
-       public void setPingRunning(boolean pingRunning) {
-               this.pingRunning = pingRunning;
-       }
-
        public int getPort() {
                return port;
        }
@@ -268,7 +258,7 @@ public class Server implements Serializa
        public String toString() {
                return "Server [id=" + id + ", name=" + name + ", address=" + 
address
                                + ", port=" + port + ", user=" + user + ", 
pass=" + pass
-                               + ", protocol=" + protocol + ", pingRunning=" + 
pingRunning
+                               + ", protocol=" + protocol 
                                + ", active=" + active + ", webapp=" + webapp + 
", deleted="
                                + deleted + "]";
        }

Modified: 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
 (original)
+++ 
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
 Sun Nov 18 08:46:33 2012
@@ -18,28 +18,111 @@
  */
 package org.apache.openmeetings.quartz.scheduler;
 
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.openmeetings.OpenmeetingsVariables;
+import org.apache.openmeetings.cluster.sync.IRestClientObserver;
 import org.apache.openmeetings.cluster.sync.RestClient;
+import org.apache.openmeetings.conference.room.ISharedSessionStore;
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.data.basic.dao.ServerDao;
+import org.apache.openmeetings.persistence.beans.basic.Server;
 import org.red5.logging.Red5LoggerFactory;
 import org.slf4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class ClusterSlaveJob {
-       private static Logger log = 
Red5LoggerFactory.getLogger(ClusterSlaveJob.class, 
OpenmeetingsVariables.webAppRootKey);
+public class ClusterSlaveJob implements IRestClientObserver {
+
+       private static Logger log = Red5LoggerFactory.getLogger(
+                       ClusterSlaveJob.class, 
OpenmeetingsVariables.webAppRootKey);
 
        @Autowired
-       private RestClient restClient;
+       private ServerDao serverDao;
+       @Autowired
+       private ISharedSessionStore clientListManager;
+
+
+       /**
+        * We store the list of RestClients in the memory, so that we can simply
+        * call ping to get the load, without the need to get a new session 
Hash and
+        * to login again. <br/>
+        * There can be only one RestClient per server, so we use the primary 
key of
+        * the server to store the RestClient.
+        */
+       private static Map<Long, RestClient> restClientsSessionStore = new 
HashMap<Long, RestClient>();
+
+       /**
+        * Synchronized, cause nobody should manipulate the object while another
+        * process requests it, the scheduler could run several times and 
request
+        * the same object, add or remove it.<br/>
+        * If there is no object yet, create one.
+        * 
+        * @param server
+        */
+       private synchronized RestClient getRestClient(Server server) {
+               RestClient restClient = 
restClientsSessionStore.get(server.getId());
+
+               // check if any values of the server have been changed,
+               // if yes, we need a new RestClient to make sure it will 
relogin to the
+               // changed server details
+               if (restClient != null && 
restClient.hasServerDetailsChanged(server)) {
+                       log.debug("Server details changed, get new rest 
client");
+                       restClient = null;
+               }
+
+               if (restClient == null) {
+                       restClient = new RestClient(this, server);
+                       restClientsSessionStore.put(server.getId(), restClient);
+               }
+               return restClient;
+       }
 
        public void doIt() {
                log.debug("ClusterSlaveJob.execute");
                try {
 
-                       restClient.ping();
+                       log.debug("ClusterSlaveJob. SERVERS :: "
+                                       + serverDao.getSlavesForPing().size());
+
+                       for (Server server : serverDao.getSlavesForPing()) {
+
+                               RestClient rClient = getRestClient(server);
+                               
+                               //If the ping is still running, we don't ping 
the client in this session
+                               if (rClient.getPingRunning()) {
+                                       log.warn("The ping for the server " + 
server
+                                                       + " takes longer then 
the ping interval!");
+                                       continue;
+                               }
+
+                               rClient.ping();
+
+                       }
 
                } catch (Exception e) {
-                       log.error(
-                                       "Unexpected exception while processing 
slave master ping.",
-                                       e);
+                       log.error("Unexpected exception while doRoundTrip 
cluster.", e);
                }
        }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.openmeetings.cluster.sync.IRestClientObserverEvent#pingComplete
+        * (org.apache.openmeetings.persistence.beans.basic.Server, 
java.util.List)
+        */
+       public void pingComplete(Server server, List<SlaveClientDto> 
slaveClients) {
+
+               log.debug("-- pingComplete -- For server: " + server);
+
+               clientListManager.syncSlaveClientSession(server, slaveClients);
+
+               server.setLastPing(Calendar.getInstance());
+               serverDao.update(server);
+
+       }
+
 }


Reply via email to