Author: sebawagner
Date: Sun Nov 18 08:46:33 2012
New Revision: 1410852
URL: http://svn.apache.org/viewvc?rev=1410852&view=rev
Log:
OPENMEETINGS-460 remove "pingRunning" into RestClient, implement first version
of scheduler to sync sessions to master.
Added:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
Removed:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/SlaveObserver.java
Modified:
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
Modified:
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
(original)
+++
incubator/openmeetings/trunk/singlewebapp/WebContent/WEB-INF/openmeetings-applicationContext.xml
Sun Nov 18 08:46:33 2012
@@ -90,14 +90,11 @@
<bean id="openmeetings.FlvRecorderConverter"
class="org.apache.openmeetings.data.flvrecord.converter.FlvRecorderConverter" />
- <bean id="openmeetings.SlaveObserver"
- class="org.apache.openmeetings.cluster.sync.SlaveObserver" />
-
<!--
#####################################################################
Comment this in to enable the OpenMeetings instance to act as Master
#####################################################################
-
+ -->
<bean id="openmeetings.ClusterSlaveJob"
class="org.apache.openmeetings.quartz.scheduler.ClusterSlaveJob" />
<bean id="clusterSlaveJob"
@@ -128,7 +125,7 @@
</list>
</property>
</bean>
- -->
+
<bean id="openmeetings.SessionClearJob"
class="org.apache.openmeetings.quartz.scheduler.SessionClearJob" />
<bean id="sessionClearJob"
Modified:
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
(original)
+++
incubator/openmeetings/trunk/singlewebapp/WebContent/src/modules/admin/servers/serverAdminValueForm.lzx
Sun Nov 18 08:46:33 2012
@@ -38,7 +38,6 @@
this.serverName.setAttribute('text', '');
this.ip.setAttribute('text', '');
this.lastPing.setAttribute('text','');
- this.pingRunning.setAttribute('text','');
this.comment.setAttribute('text','');
this.port.setAttribute('text','');
this.user.setAttribute('text','');
@@ -68,7 +67,6 @@
this.ip.setAttribute('text', server.address);
this.lastPing.setAttribute('text', server.lastPing);
this.comment.setAttribute('text', server.comment);
- this.pingRunning.setAttribute('text',
server.pingRunning);
this.comment.setAttribute('text', server.comment);
this.port.setAttribute('text', server.port);
this.user.setAttribute('text', server.user);
@@ -145,11 +143,8 @@
<labelText labelid="1517" width="200" y="360" resize="false" x="2"/>
<labelText name="lastPing" y="360" x="120" width="270" />
- <labelText labelid="1524" width="200" y="380" resize="false" x="2"/>
- <labelText name="pingRunning" y="380" x="120" width="270" />
-
- <labelText labelid="270" width="200" y="400" resize="false" x="2"/>
- <customScrollEdittext name="comment" y="400" x="120"
width="270"
+ <labelText labelid="270" width="200" y="380" resize="false" x="2"/>
+ <customScrollEdittext name="comment" y="380" x="120"
width="270"
height="100" text="" />
</class>
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebService.java
Sun Nov 18 08:46:33 2012
@@ -122,20 +122,22 @@ public class ServerWebService {
* - webapp name of the OpenMeetings instance
* @param protocol
* - protocol to access the OpenMeetings instance
+ * @param active
+ * - if the server currently participates in the cluster or
not
* @param comment
* - comment for the server
* @return the id of saved server
*/
public long saveServer(String SID, long id, String name, String address,
int port, String user, String pass, String webapp,
String protocol,
- String comment) throws AxisFault {
+ Boolean active, String comment) throws AxisFault {
log.debug("saveServerCount enter");
Long users_id = sessionManagement.checkSession(SID);
Long user_level = userManagement.getUserLevelByID(users_id);
if (authLevelManagement.checkWebServiceLevel(user_level)) {
return serversDao.saveServer(id, name, address, port,
user, pass,
- webapp, protocol, comment, users_id)
+ webapp, protocol, active, comment,
users_id)
.getId();
} else {
log.warn("Insuffisient permissions");
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/axis/services/ServerWebServiceFacade.java
Sun Nov 18 08:46:33 2012
@@ -72,13 +72,13 @@ public class ServerWebServiceFacade {
/**
* Proxy method please see
- * {@link ServerWebService#saveServer(String, long, String, String,
int, String, String, String, String, String)}
+ * {@link ServerWebService#saveServer(String, long, String, String,
int, String, String, String, String, Boolean, String)}
*/
public long saveServer(String SID, long id, String name, String address,
int port, String user, String pass, String webapp,
String protocol,
- String comment) throws AxisFault {
+ Boolean active, String comment) throws AxisFault {
return getServerServiceProxy().saveServer(SID, id, name,
address, port,
- user, pass, webapp, protocol, comment);
+ user, pass, webapp, protocol, active, comment);
}
/**
Added:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java?rev=1410852&view=auto
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
(added)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/IRestClientObserver.java
Sun Nov 18 08:46:33 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") + you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.cluster.sync;
+
+import java.util.List;
+
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.persistence.beans.basic.Server;
+
+/**
+ * Defines the events the {@link RestClient} will broadcast when he performs
calls
+ * to the {@link Server}
+ *
+ * @author sebawagner
+ *
+ */
+public interface IRestClientObserver {
+
+ /**
+ * performed by the RestClient, whenever a ping is completed and the
+ * response is parsed to a session object ready to be injected in the
local
+ * session store
+ *
+ * @param server
+ * @param slaveClients
+ */
+ public void pingComplete(Server server, List<SlaveClientDto>
slaveClients);
+
+}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/cluster/sync/RestClient.java
Sun Nov 18 08:46:33 2012
@@ -34,17 +34,30 @@ import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.openmeetings.OpenmeetingsVariables;
import org.apache.openmeetings.conference.room.RoomClient;
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.persistence.beans.basic.Server;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
+/**
+ *
+ * Performs call to the WebService Gateway to load the server load from the
+ * slave's of the cluster
+ *
+ * @author sebawagner
+ *
+ */
public class RestClient {
private static final Logger log = Red5LoggerFactory.getLogger(
RestClient.class, OpenmeetingsVariables.webAppRootKey);
- private boolean loginSuccess = false;
- private String sessionId;
-
+ /**
+ * The observerInstance will be notified whenever a ping was completed
+ */
+ private IRestClientObserver observerInstance;
+
+ private Server server;
private final String host;
private final int port;
private final String protocol;
@@ -52,6 +65,22 @@ public class RestClient {
private final String user;
private final String pass;
+ private boolean loginSuccess = false;
+ private String sessionId;
+
+ private boolean pingRunning = false;
+
+ /**
+ * returns true as long as the RestClient performs a ping and parses
the result
+ *
+ * @return
+ */
+ public boolean getPingRunning() {
+ return pingRunning;
+ }
+
+ private static String NAMESPACE_PREFIX =
"http://services.axis.openmeetings.apache.org";
+
private String getUserServiceEndPoint() {
return protocol + "://" + host + ":" + port + "/" + webapp
+ "/services/UserService";
@@ -73,9 +102,44 @@ public class RestClient {
}
}
- public RestClient(String host, int port, String protocol, String webapp,
+ /**
+ * The observerInstance will be notified whenever a ping was completed
+ *
+ * @param observerInstance
+ * @param host
+ * @param port
+ * @param protocol
+ * @param webapp
+ * @param user
+ * @param pass
+ */
+ public RestClient(IRestClientObserver observerInstance, Server server) {
+ this.observerInstance = observerInstance;
+ this.server = server;
+ this.host = server.getAddress();
+ this.port = server.getPort();
+ this.protocol = server.getProtocol();
+ this.webapp = server.getWebapp();
+ this.user = server.getUser();
+ this.pass = server.getPass();
+ }
+
+ /**
+ * for simple testing this method provides a version of the constructor
+ * without need for a server entity.<br/>
+ * <br/>
+ * There is no spring or JPA enhanced class in use here. This Object is
+ * stored in session/memory
+ *
+ * @param host
+ * @param port
+ * @param protocol
+ * @param webapp
+ * @param user
+ * @param pass
+ */
+ private RestClient(String host, int port, String protocol, String
webapp,
String user, String pass) {
- super();
this.host = host;
this.port = port;
this.protocol = protocol;
@@ -85,14 +149,44 @@ public class RestClient {
}
/**
+ * compare if the details here and the one stored are still the same
+ *
+ * @param server2
+ * @return
+ */
+ public boolean hasServerDetailsChanged(Server server2) {
+
+ if (!host.equals(server2.getAddress())) {
+ return true;
+ }
+ if (port != server2.getPort()) {
+ return true;
+ }
+ if (!user.equals(server2.getUser())) {
+ return true;
+ }
+ if (!pass.equals(server2.getPass())) {
+ return true;
+ }
+ if (!webapp.equals(server2.getWebapp())) {
+ return true;
+ }
+ if (!protocol.equals(server2.getProtocol())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Login the user via REST
*
* @throws Exception
*/
public void loginUser() throws Exception {
+
Options options = new Options();
options.setTo(new EndpointReference(getUserServiceEndPoint()));
-
options.setProperty(Constants.Configuration.ENABLE_REST,
Constants.VALUE_TRUE);
@@ -103,16 +197,13 @@ public class RestClient {
OMElement getSessionResult = sender
.sendReceive(getPayloadMethodGetSession());
sessionId = getSessionIdFromResult(getSessionResult);
- log.debug("sessionId:: " + sessionId);
OMElement loginUserResult = sender
.sendReceive(getPayloadMethodLoginUser());
- log.debug("loginUserResult:: " + loginUserResult);
-
loginSuccess = loginSuccessFromResult(loginUserResult);
- log.debug("loginSuccess:: " + loginSuccess);
+ ping();
}
@@ -122,27 +213,44 @@ public class RestClient {
*
* @throws Exception
*/
- public void ping() throws Exception {
- if (!loginSuccess) {
- loginUser();
- } else {
- Options options = new Options();
- options.setTo(new
EndpointReference(getServerServiceEndPoint()));
-
- options.setProperty(Constants.Configuration.ENABLE_REST,
- Constants.VALUE_TRUE);
-
- ServiceClient sender = new ServiceClient();
- sender.engageModule(new
QName(Constants.MODULE_ADDRESSING)
- .getLocalPart());
- sender.setOptions(options);
-
- OMElement pingResult = sender
-
.sendReceive(getPayloadMethodPingTemp());
-
- log.debug("pingResult:: " + pingResult);
+ public void ping() {
+ try {
+ //flag this flow as active
+ pingRunning = true;
+
+ if (!loginSuccess) {
+ loginUser();
+ } else {
+
+ Options options = new Options();
+ options.setTo(new
EndpointReference(getServerServiceEndPoint()));
+
options.setProperty(Constants.Configuration.ENABLE_REST,
+ Constants.VALUE_TRUE);
+
+ ServiceClient sender = new ServiceClient();
+ sender.engageModule(new
QName(Constants.MODULE_ADDRESSING)
+ .getLocalPart());
+ sender.setOptions(options);
+
+ OMElement pingResult = sender
+
.sendReceive(getPayloadMethodPingTemp());
+
+ List<SlaveClientDto> slaveClients =
pingFromResult(pingResult);
+
+ if (this.observerInstance != null) {
+
this.observerInstance.pingComplete(server, slaveClients);
+ }
+
+ //flag this flow as complete
+ pingRunning = false;
- pingFromResult(pingResult);
+ }
+ // Catches all errors to make sure the observer is
notified that the
+ // ping was performed (even when performed badly)
+ } catch (Exception ex) {
+ //flag this flow as complete
+ pingRunning = false;
+ log.error("[ping failed]", ex);
}
}
@@ -151,10 +259,9 @@ public class RestClient {
*
* @return
*/
- private OMElement getPayloadMethodGetSession() {
+ private OMElement getPayloadMethodGetSession() throws Exception {
OMFactory fac = OMAbstractFactory.getOMFactory();
- OMNamespace omNs = fac.createOMNamespace(
- "http://services.axis.openmeetings.apache.org",
"pre");
+ OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX,
"pre");
OMElement method = fac.createOMElement("getSession", omNs);
return method;
}
@@ -185,10 +292,9 @@ public class RestClient {
*
* @return
*/
- private OMElement getPayloadMethodLoginUser() {
+ private OMElement getPayloadMethodLoginUser() throws Exception {
OMFactory fac = OMAbstractFactory.getOMFactory();
- OMNamespace omNs = fac.createOMNamespace(
- "http://services.axis.openmeetings.apache.org",
"pre");
+ OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX,
"pre");
OMElement method = fac.createOMElement("loginUser", omNs);
OMElement sid = fac.createOMElement("SID", omNs);
@@ -215,8 +321,7 @@ public class RestClient {
*/
private boolean loginSuccessFromResult(OMElement result) throws
Exception {
- QName loginResult = new QName(
- "http://services.axis.openmeetings.apache.org",
"return");
+ QName loginResult = new QName(NAMESPACE_PREFIX, "return");
@SuppressWarnings("unchecked")
Iterator<OMElement> elements =
result.getChildrenWithName(loginResult);
@@ -226,7 +331,7 @@ public class RestClient {
return true;
} else {
throw new Exception("Could not login user at,
error code is: "
- +
resultElement.getText());
+ + resultElement.getText());
}
} else {
throw new Exception("Could not parse login result");
@@ -239,10 +344,9 @@ public class RestClient {
*
* @return
*/
- private OMElement getPayloadMethodPingTemp() {
+ private OMElement getPayloadMethodPingTemp() throws Exception {
OMFactory fac = OMAbstractFactory.getOMFactory();
- OMNamespace omNs = fac.createOMNamespace(
- "http://services.axis.openmeetings.apache.org",
"pre");
+ OMNamespace omNs = fac.createOMNamespace(NAMESPACE_PREFIX,
"pre");
OMElement method = fac.createOMElement("ping", omNs);
OMElement sid = fac.createOMElement("SID", omNs);
@@ -261,24 +365,40 @@ public class RestClient {
* @return list of {@link RoomClient}s
* @throws Exception
*/
- private List<RoomClient> pingFromResult(OMElement result) throws
Exception {
+ private List<SlaveClientDto> pingFromResult(OMElement result) throws
Exception {
- QName pingResult = new QName(
- "http://services.axis.openmeetings.apache.org",
"return");
+ QName pingResult = new QName(NAMESPACE_PREFIX, "return");
String nameSpaceForSlaveDto =
"http://room.conference.openmeetings.apache.org/xsd";
@SuppressWarnings("unchecked")
Iterator<OMElement> elements =
result.getChildrenWithName(pingResult);
- List<RoomClient> clients = new ArrayList<RoomClient>();
+ List<SlaveClientDto> clients = new ArrayList<SlaveClientDto>();
while (elements.hasNext()) {
OMElement resultElement = elements.next();
- clients.add(new RoomClient( //
- resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "streamid")).getText(), //
- resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "publicSID")).getText(), //
-
Long.valueOf(resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "roomId")).getText()).longValue(), //
-
Long.valueOf(resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "userId")).getText()).longValue(), //
- resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "firstName")).getText(), //
- resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "lastName")).getText()) //
+
+ Long roomId = null;
+ String roomIdAsXmlString =
resultElement.getFirstChildWithName(
+ new QName(nameSpaceForSlaveDto,
"roomId")).getText();
+ if (roomIdAsXmlString != null &&
roomIdAsXmlString.length() > 0) {
+ roomId =
Long.valueOf(roomIdAsXmlString).longValue();
+ }
+
+ Long userId = null;
+ String userIdAsXmlString =
resultElement.getFirstChildWithName(
+ new QName(nameSpaceForSlaveDto,
"userId")).getText();
+ if (userIdAsXmlString != null &&
userIdAsXmlString.length() > 0) {
+ userId =
Long.valueOf(userIdAsXmlString).longValue();
+ }
+
+ clients.add(new SlaveClientDto( //
+ resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "streamid")).getText(), //
+ resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "publicSID")).getText(), //
+ roomId, //
+ userId, //
+ resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "firstName")).getText(), //
+ resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto, "lastName")).getText(), //
+
Boolean.valueOf(resultElement.getFirstChildWithName(new
QName(nameSpaceForSlaveDto,"isAVClient")).getText()).booleanValue() //
+ ) //
);
}
return clients;
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/ClientListHashMapStore.java
Sun Nov 18 08:46:33 2012
@@ -515,29 +515,34 @@ public class ClientListHashMapStore impl
for (Iterator<Entry<String, ClientSession>> iter = clientList
.entrySet().iterator(); iter.hasNext();) {
Entry<String, ClientSession> entry = iter.next();
- if (entry.getValue().getServer().equals(server)) {
+ if (entry.getValue().getServer() != null
+ &&
entry.getValue().getServer().equals(server)) {
iter.remove();
}
}
- System.err.println("Session 2 Length: " + clientList.size());
+ log.debug("Session 2 Length: " + clientList.size());
for (SlaveClientDto slaveClientDto : clients) {
String uniqueKey =
ClientSessionUtil.getClientSessionKey(null,
slaveClientDto.getStreamid());
- clientList.put(uniqueKey, new ClientSession(server, new
RoomClient(
- slaveClientDto.getStreamid(),
- slaveClientDto.getPublicSID(),
slaveClientDto.getRoomId(),
- slaveClientDto.getUserId(),
slaveClientDto.getFirstName(),
- slaveClientDto.getLastName())));
+ clientList.put(
+ uniqueKey,
+ new ClientSession(server, new
RoomClient(slaveClientDto
+ .getStreamid(),
slaveClientDto.getPublicSID(),
+
slaveClientDto.getRoomId(), slaveClientDto
+
.getUserId(),
+
slaveClientDto.getFirstName(), slaveClientDto
+
.getLastName(), slaveClientDto
+
.getIsAVClient())));
}
-
- System.err.println("Session 3 Length: " + clientList.size());
+
+ log.debug("Session 3 Length: " + clientList.size());
for (ClientSession cSession : clientList.values()) {
- System.err.println("cSession: " + cSession.getServer()
- + " cSession RCL " +
cSession.getRoomClient());
+ log.warn("cSession: " + cSession.getServer() + "
cSession RCL "
+ + cSession.getRoomClient());
}
}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/RoomClient.java
Sun Nov 18 08:46:33 2012
@@ -183,7 +183,7 @@ public class RoomClient implements Seria
}
public RoomClient(String streamid, String publicSID, Long room_id,
- Long user_id, String firstname, String lastname) {
+ Long user_id, String firstname, String lastname,
boolean isAVClient) {
super();
this.streamid = streamid;
this.publicSID = publicSID;
@@ -191,6 +191,7 @@ public class RoomClient implements Seria
this.user_id = user_id;
this.firstname = firstname;
this.lastname = lastname;
+ this.isAVClient = isAVClient;
}
public void setUserObject(Long user_id, String username, String
firstname, String lastname) {
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/conference/room/SlaveClientDto.java
Sun Nov 18 08:46:33 2012
@@ -34,18 +34,8 @@ public class SlaveClientDto {
private String lastName;
private Long userId;
private Long roomId;
+ private boolean isAVClient = false;
- public SlaveClientDto(String streamid, String publicSID, String
firstName,
- String lastName, Long userId, Long roomId) {
- super();
- this.streamid = streamid;
- this.publicSID = publicSID;
- this.firstName = firstName;
- this.lastName = lastName;
- this.userId = userId;
- this.roomId = roomId;
- }
-
public SlaveClientDto(RoomClient roomClient) {
this.streamid = roomClient.getStreamid();
this.publicSID = roomClient.getPublicSID();
@@ -53,6 +43,18 @@ public class SlaveClientDto {
this.lastName = roomClient.getLastname();
this.userId = roomClient.getUser_id();
this.roomId = roomClient.getRoom_id();
+ this.isAVClient = roomClient.getIsAVClient();
+ }
+
+ public SlaveClientDto(String streamid, String publicSID, Long roomId2,
+ Long userId2, String firstName, String lastName,
boolean isAVClient) {
+ this.streamid = streamid;
+ this.publicSID = publicSID;
+ this.firstName = firstName;
+ this.lastName = lastName;
+ this.userId = userId2;
+ this.roomId = roomId2;
+ this.isAVClient = isAVClient;
}
public String getStreamid() {
@@ -103,4 +105,12 @@ public class SlaveClientDto {
this.publicSID = publicSID;
}
+ public boolean getIsAVClient() {
+ return isAVClient;
+ }
+
+ public void setIsAVClient(boolean isAVClient) {
+ this.isAVClient = isAVClient;
+ }
+
}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/data/basic/dao/ServerDao.java
Sun Nov 18 08:46:33 2012
@@ -41,18 +41,18 @@ import org.springframework.transaction.a
*
* CRUD for {@link Server}
*
- * @author solomax, swagner
+ * @author solomax, sebawagner
*
*/
@Transactional
public class ServerDao implements IDataProviderDao<Server> {
private static final Logger log = Red5LoggerFactory.getLogger(
ServerDao.class, OpenmeetingsVariables.webAppRootKey);
- public final static String[] searchFields = {"name", "address",
"comment"};
+ public final static String[] searchFields = { "name", "address",
"comment" };
@PersistenceContext
private EntityManager em;
-
+
@Autowired
private UsersDao usersDao;
@@ -74,22 +74,33 @@ public class ServerDao implements IDataP
* @see org.apache.openmeetings.data.OmDAO#get(int, int)
*/
public List<Server> get(int start, int max) {
- log.debug("getServerList enter");
TypedQuery<Server> q = em.createNamedQuery("getAllServers",
Server.class);
q.setFirstResult(start);
q.setMaxResults(max);
-
return q.getResultList();
}
-
+
public List<Server> get(String search, int start, int count, String
order) {
- TypedQuery<Server> q =
em.createQuery(DaoHelper.getSearchQuery("Server", "s", search, true, false,
order, searchFields), Server.class);
+ TypedQuery<Server> q = em.createQuery(DaoHelper.getSearchQuery(
+ "Server", "s", search, true, false, order,
searchFields),
+ Server.class);
q.setFirstResult(start);
q.setMaxResults(count);
return q.getResultList();
}
-
+
+ /**
+ * get the list of all servers in the cluster that are ready to receive
a
+ * ping (active = true)
+ *
+ * @return
+ */
+ public List<Server> getSlavesForPing() {
+ return em.createNamedQuery("getSlavesForPing", Server.class)
+ .getResultList();
+ }
+
/*
* (non-Javadoc)
*
@@ -103,10 +114,11 @@ public class ServerDao implements IDataP
}
public long count(String search) {
- TypedQuery<Long> q =
em.createQuery(DaoHelper.getSearchQuery("Server", "s", search, true, true,
null, searchFields), Long.class);
+ TypedQuery<Long> q =
em.createQuery(DaoHelper.getSearchQuery("Server",
+ "s", search, true, true, null, searchFields),
Long.class);
return q.getSingleResult();
}
-
+
/*
* (non-Javadoc)
*
@@ -115,7 +127,8 @@ public class ServerDao implements IDataP
public Server get(long id) {
Server result = null;
log.debug("getServer enter, id = " + id);
- TypedQuery<Server> q = em.createNamedQuery("getServerById",
Server.class);
+ TypedQuery<Server> q = em.createNamedQuery("getServerById",
+ Server.class);
q.setParameter("id", id);
try {
result = q.getSingleResult();
@@ -133,38 +146,43 @@ public class ServerDao implements IDataP
*/
public Server getServerByAddress(String address) {
log.debug("getServer enter, address = " + address);
- TypedQuery<Server> q =
em.createNamedQuery("getServerByAddress", Server.class);
+ TypedQuery<Server> q = em.createNamedQuery("getServerByAddress",
+ Server.class);
q.setParameter("address", address);
List<Server> list = q.getResultList();
return list.size() > 0 ? list.get(0) : null;
}
/**
- * This method is necessary to automatically assign user to the server
with minimum load.
+ * This method is necessary to automatically assign user to the server
with
+ * minimum load.
*
- * First of all we are trying to find servers referenced by 0 users.
- * If all servers are referenced by at least 1 user we are searching
the first server referenced by minimum users.
+ * First of all we are trying to find servers referenced by 0 users. If
all
+ * servers are referenced by at least 1 user we are searching the first
+ * server referenced by minimum users.
*
- * @return Server object referenced by the minimum user accounts.
+ * @return Server object referenced by the minimum user accounts.
*/
public Server getServerWithMinimumUsers() {
Server result = null;
log.debug("getServerWithMinimumUsers enter");
- TypedQuery<Server> q =
em.createNamedQuery("getServersWithNoUsers", Server.class);
+ TypedQuery<Server> q =
em.createNamedQuery("getServersWithNoUsers",
+ Server.class);
List<Server> l = q.getResultList();
if (l.isEmpty()) {
- TypedQuery<Object> q1 =
em.createNamedQuery("getServerWithMinimumUsers", Object.class);
+ TypedQuery<Object> q1 = em.createNamedQuery(
+ "getServerWithMinimumUsers",
Object.class);
List<Object> r = q1.getResultList();
if (!r.isEmpty()) {
// get server id from first line
- result = get((Long)((Object[])r.get(0))[0]);
+ result = get((Long) ((Object[]) r.get(0))[0]);
}
} else {
result = l.get(0);
}
return result;
}
-
+
/**
* @deprecated user standard mechanism of
* {@link
IDataProviderDao#update(org.apache.openmeetings.persistence.beans.OmEntity,
long)}
@@ -174,10 +192,9 @@ public class ServerDao implements IDataP
* @return
*/
@Deprecated
- public Server saveServer(long id, String name, String address,
- int port,
+ public Server saveServer(long id, String name, String address, int port,
String user, String pass, String webapp, String
protocol,
- String comment, long userId) {
+ Boolean active, String comment, long userId) {
Server s = get(id);
if (s == null) {
s = new Server();
@@ -194,6 +211,7 @@ public class ServerDao implements IDataP
s.setPass(pass);
s.setWebapp(webapp);
s.setProtocol(protocol);
+ s.setActive(active);
s.setComment(comment);
return em.merge(s);
@@ -243,6 +261,11 @@ public class ServerDao implements IDataP
return entity;
}
+ public Server update(Server entity) {
+ em.merge(entity);
+ return entity;
+ }
+
/*
* (non-Javadoc)
*
@@ -258,7 +281,7 @@ public class ServerDao implements IDataP
em.merge(entity);
}
}
-
+
/**
* get {@link Server} by name
*
@@ -271,5 +294,5 @@ public class ServerDao implements IDataP
q.setParameter("name", name);
return q.getResultList();
}
-
+
}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/persistence/beans/basic/Server.java
Sun Nov 18 08:46:33 2012
@@ -48,7 +48,9 @@ import org.simpleframework.xml.Root;
@NamedQuery(name = "getServerByName", query = "SELECT s FROM
Server s WHERE s.deleted = false AND s.name LIKE :name"),
@NamedQuery(name = "getServerByAddress", query = "SELECT s FROM
Server s WHERE s.deleted = false AND s.address LIKE :address"),
@NamedQuery(name = "getServersWithNoUsers", query = "SELECT s
FROM Server s WHERE s.deleted = false AND s.id NOT IN (SELECT u.server.id FROM
Users u where u.server.id IS NOT NULL)"),
- @NamedQuery(name = "getServerWithMinimumUsers", query = "SELECT
s.id, COUNT(u) AS cnt FROM Users u JOIN u.server s WHERE s.deleted = false
GROUP BY s.id ORDER BY cnt") })
+ @NamedQuery(name = "getServerWithMinimumUsers", query = "SELECT
s.id, COUNT(u) AS cnt FROM Users u JOIN u.server s WHERE s.deleted = false
GROUP BY s.id ORDER BY cnt"),
+ @NamedQuery(name = "getSlavesForPing", query = "SELECT s FROM
Server s WHERE s.deleted = false AND s.active = true"), //
+})
@Table(name = "server")
@Root
public class Server implements Serializable, IDataProviderEntity {
@@ -92,10 +94,6 @@ public class Server implements Serializa
@Element(data = true, required = false)
private Calendar lastPing;
- @Column(name = "ping_running", nullable = true)
- @Element(data = true, required = false)
- private boolean pingRunning;
-
@Column(name = "port", nullable = true)
@Element(data = true, required = false)
private int port;
@@ -204,14 +202,6 @@ public class Server implements Serializa
this.lastPing = lastPing;
}
- public boolean isPingRunning() {
- return pingRunning;
- }
-
- public void setPingRunning(boolean pingRunning) {
- this.pingRunning = pingRunning;
- }
-
public int getPort() {
return port;
}
@@ -268,7 +258,7 @@ public class Server implements Serializa
public String toString() {
return "Server [id=" + id + ", name=" + name + ", address=" +
address
+ ", port=" + port + ", user=" + user + ",
pass=" + pass
- + ", protocol=" + protocol + ", pingRunning=" +
pingRunning
+ + ", protocol=" + protocol
+ ", active=" + active + ", webapp=" + webapp +
", deleted="
+ deleted + "]";
}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java?rev=1410852&r1=1410851&r2=1410852&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/apache/openmeetings/quartz/scheduler/ClusterSlaveJob.java
Sun Nov 18 08:46:33 2012
@@ -18,28 +18,111 @@
*/
package org.apache.openmeetings.quartz.scheduler;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.openmeetings.OpenmeetingsVariables;
+import org.apache.openmeetings.cluster.sync.IRestClientObserver;
import org.apache.openmeetings.cluster.sync.RestClient;
+import org.apache.openmeetings.conference.room.ISharedSessionStore;
+import org.apache.openmeetings.conference.room.SlaveClientDto;
+import org.apache.openmeetings.data.basic.dao.ServerDao;
+import org.apache.openmeetings.persistence.beans.basic.Server;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
-public class ClusterSlaveJob {
- private static Logger log =
Red5LoggerFactory.getLogger(ClusterSlaveJob.class,
OpenmeetingsVariables.webAppRootKey);
+public class ClusterSlaveJob implements IRestClientObserver {
+
+ private static Logger log = Red5LoggerFactory.getLogger(
+ ClusterSlaveJob.class,
OpenmeetingsVariables.webAppRootKey);
@Autowired
- private RestClient restClient;
+ private ServerDao serverDao;
+ @Autowired
+ private ISharedSessionStore clientListManager;
+
+
+ /**
+ * We store the list of RestClients in the memory, so that we can simply
+ * call ping to get the load, without the need to get a new session
Hash and
+ * to login again. <br/>
+ * There can be only one RestClient per server, so we use the primary
key of
+ * the server to store the RestClient.
+ */
+ private static Map<Long, RestClient> restClientsSessionStore = new
HashMap<Long, RestClient>();
+
+ /**
+ * Synchronized, cause nobody should manipulate the object while another
+ * process requests it, the scheduler could run several times and
request
+ * the same object, add or remove it.<br/>
+ * If there is no object yet, create one.
+ *
+ * @param server
+ */
+ private synchronized RestClient getRestClient(Server server) {
+ RestClient restClient =
restClientsSessionStore.get(server.getId());
+
+ // check if any values of the server have been changed,
+ // if yes, we need a new RestClient to make sure it will
relogin to the
+ // changed server details
+ if (restClient != null &&
restClient.hasServerDetailsChanged(server)) {
+ log.debug("Server details changed, get new rest
client");
+ restClient = null;
+ }
+
+ if (restClient == null) {
+ restClient = new RestClient(this, server);
+ restClientsSessionStore.put(server.getId(), restClient);
+ }
+ return restClient;
+ }
public void doIt() {
log.debug("ClusterSlaveJob.execute");
try {
- restClient.ping();
+ log.debug("ClusterSlaveJob. SERVERS :: "
+ + serverDao.getSlavesForPing().size());
+
+ for (Server server : serverDao.getSlavesForPing()) {
+
+ RestClient rClient = getRestClient(server);
+
+ //If the ping is still running, we don't ping
the client in this session
+ if (rClient.getPingRunning()) {
+ log.warn("The ping for the server " +
server
+ + " takes longer then
the ping interval!");
+ continue;
+ }
+
+ rClient.ping();
+
+ }
} catch (Exception e) {
- log.error(
- "Unexpected exception while processing
slave master ping.",
- e);
+ log.error("Unexpected exception while doRoundTrip
cluster.", e);
}
}
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.openmeetings.cluster.sync.IRestClientObserverEvent#pingComplete
+ * (org.apache.openmeetings.persistence.beans.basic.Server,
java.util.List)
+ */
+ public void pingComplete(Server server, List<SlaveClientDto>
slaveClients) {
+
+ log.debug("-- pingComplete -- For server: " + server);
+
+ clientListManager.syncSlaveClientSession(server, slaveClients);
+
+ server.setLastPing(Calendar.getInstance());
+ serverDao.update(server);
+
+ }
+
}