Author: solomax
Date: Wed Jun 20 08:23:00 2012
New Revision: 1351985
URL: http://svn.apache.org/viewvc?rev=1351985&view=rev
Log:
OPENMEETINGS-295 RTMPT screen sharing client is fixed.
Added:
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
Modified:
incubator/openmeetings/trunk/singlewebapp/build.xml
Added:
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java?rev=1351985&view=auto
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
(added)
+++
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
Wed Jun 20 08:23:00 2012
@@ -0,0 +1,130 @@
+/*
+ * RED5 Open Source Flash Server - http://code.google.com/p/red5/
+ *
+ * Copyright 2006-2012 by respective authors (see below). All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.red5.client.net.rtmpt;
+
+import java.util.Map;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.red5.client.net.rtmp.BaseRTMPClientHandler;
+import org.red5.io.object.Deserializer;
+import org.red5.io.object.Serializer;
+import org.red5.server.net.protocol.ProtocolState;
+import org.red5.server.net.rtmp.RTMPConnection;
+import org.red5.server.net.rtmp.RTMPMinaConnection;
+import org.red5.server.net.rtmp.codec.RTMP;
+import org.red5.server.net.rtmp.codec.RTMPProtocolDecoder;
+import org.red5.server.net.rtmp.codec.RTMPProtocolEncoder;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmpt.codec.RTMPTCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RTMPT client object
+ *
+ * @author Anton Lebedevich
+ */
+public class RTMPTClient extends BaseRTMPClientHandler {
+
+ private static final Logger log =
LoggerFactory.getLogger(RTMPTClient.class);
+
+ // guarded by this
+ private RTMPTClientConnector connector;
+
+ private RTMPTCodecFactory codecFactory;
+
+ public RTMPTClient() {
+ codecFactory = new RTMPTCodecFactory();
+ codecFactory.setDeserializer(new Deserializer());
+ codecFactory.setSerializer(new Serializer());
+ codecFactory.init();
+ }
+
+ public Map<String, Object> makeDefaultConnectionParams(String server,
int port, String application) {
+ Map<String, Object> params =
super.makeDefaultConnectionParams(server, port, application);
+ if (!params.containsKey("tcUrl")) {
+ params.put("tcUrl", "rtmpt://" + server + ':' + port +
'/' + application);
+ }
+ return params;
+ }
+
+ protected synchronized void startConnector(String server, int port) {
+ connector = new RTMPTClientConnector(server, port, this);
+ log.debug("Created connector {}", connector);
+ connector.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void messageReceived(Object in, IoSession session) throws
Exception {
+ RTMPConnection conn = (RTMPTClientConnection)
session.getAttribute(RTMPConnection.RTMP_CONNECTION_KEY);
+ RTMP state = (RTMP)
session.getAttribute(ProtocolState.SESSION_KEY);
+ if (in instanceof IoBuffer) {
+ rawBufferRecieved(conn, state, (IoBuffer) in);
+ } else {
+ super.messageReceived(in, session);
+ }
+ }
+
+ /**
+ * Handle raw buffer receipt
+ *
+ * @param conn
+ * RTMP connection
+ * @param state
+ * Protocol state
+ * @param in
+ * IoBuffer with input raw data
+ */
+ private void rawBufferRecieved(RTMPConnection conn, ProtocolState
state, IoBuffer _in) {
+ log.debug("Handshake 3d phase - size: {}", _in.remaining());
+ IoBuffer out = IoBuffer.allocate(Constants.HANDSHAKE_SIZE);
+ IoBuffer in = _in;
+ if (!in.isAutoExpand()) {
+ in = IoBuffer.allocate(in.position() +
Constants.HANDSHAKE_SIZE);
+ in.setAutoExpand(true);
+ in.put(in);
+ }
+ in.skip(1);
+ in.limit(in.position() + Constants.HANDSHAKE_SIZE);
+ out.put(in);
+ out.flip();
+ conn.writeRaw(out);
+ connectionOpened(conn, conn.getState());
+ }
+
+ public synchronized void disconnect() {
+ if (connector != null) {
+ connector.setStopRequested(true);
+ connector.interrupt();
+ }
+ super.disconnect();
+ }
+
+ public RTMPProtocolDecoder getDecoder() {
+ return codecFactory.getRTMPDecoder();
+ }
+
+ public RTMPProtocolEncoder getEncoder() {
+ return codecFactory.getRTMPEncoder();
+ }
+
+
+}
Added:
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java?rev=1351985&view=auto
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
(added)
+++
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
Wed Jun 20 08:23:00 2012
@@ -0,0 +1,215 @@
+/*
+ * RED5 Open Source Flash Server - http://code.google.com/p/red5/
+ *
+ * Copyright 2006-2012 by respective authors (see below). All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.red5.client.net.rtmpt;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.ParseException;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.util.EntityUtils;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoSession;
+import org.red5.client.net.rtmp.RTMPClientConnManager;
+import org.red5.server.net.protocol.ProtocolState;
+import org.red5.server.net.rtmp.RTMPConnection;
+import org.red5.server.net.rtmp.codec.RTMP;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.util.HttpConnectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client connector for RTMPT
+ *
+ * @author Anton Lebedevich ([email protected])
+ * @author Paul Gregoire ([email protected])
+ */
+class RTMPTClientConnector extends Thread {
+
+ private static final Logger log =
LoggerFactory.getLogger(RTMPTClientConnector.class);
+
+ private static final String CONTENT_TYPE = "application/x-fcs";
+
+ private static final ByteArrayEntity ZERO_REQUEST_ENTITY = new
ByteArrayEntity(new byte[] { 0 });
+
+ /**
+ * Size to split messages queue by, borrowed from
+ * RTMPTServlet.RESPONSE_TARGET_SIZE
+ */
+ private static final int SEND_TARGET_SIZE = 32768;
+
+ private final DefaultHttpClient httpClient =
HttpConnectionUtil.getClient();
+
+ private final HttpHost targetHost;
+
+ private final RTMPTClient client;
+
+ private final RTMPClientConnManager connManager;
+
+ private int clientId;
+
+ private long messageCount = 1;
+
+ private volatile boolean stopRequested = false;
+
+ public RTMPTClientConnector(String server, int port, RTMPTClient
client) {
+ targetHost = new HttpHost(server, port, "http");
+
httpClient.getParams().setParameter(CoreProtocolPNames.PROTOCOL_VERSION,
HttpVersion.HTTP_1_1);
+ this.client = client;
+ this.connManager = RTMPClientConnManager.getInstance();
+ }
+
+ public void run() {
+ HttpPost post = null;
+ try {
+ RTMPTClientConnection connection = openConnection();
+ while (!connection.isClosing() && !stopRequested) {
+ IoBuffer toSend =
connection.getPendingMessages(SEND_TARGET_SIZE);
+ int limit = toSend != null ? toSend.limit() : 0;
+ if (limit > 0) {
+ post = makePost("send");
+ post.setEntity(new
InputStreamEntity(toSend.asInputStream(), limit));
+ post.addHeader("Content-Type",
CONTENT_TYPE);
+ } else {
+ post = makePost("idle");
+ post.setEntity(ZERO_REQUEST_ENTITY);
+ }
+ // execute
+ HttpResponse response =
httpClient.execute(targetHost, post);
+ // check for error
+ checkResponseCode(response);
+ // handle data
+ byte[] received =
EntityUtils.toByteArray(response.getEntity());
+ IoBuffer data = IoBuffer.wrap(received);
+ if (data.limit() > 0) {
+ data.skip(1); // XXX: polling interval
lies in this byte
+ }
+ List<?> messages = connection.decode(data);
+ if (messages == null || messages.isEmpty()) {
+ try {
+ // XXX handle polling delay
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ if (stopRequested) {
+ post.abort();
+ break;
+ }
+ }
+ continue;
+ }
+ IoSession session = new DummySession();
+
session.setAttribute(RTMPConnection.RTMP_CONNECTION_KEY, connection);
+ session.setAttribute(ProtocolState.SESSION_KEY,
connection.getState());
+ for (Object message : messages) {
+ try {
+ client.messageReceived(message,
session);
+ } catch (Exception e) {
+ log.error("Could not process
message.", e);
+ }
+ }
+ }
+ finalizeConnection();
+ client.connectionClosed(connection,
connection.getState());
+ } catch (Throwable e) {
+ log.debug("RTMPT handling exception", e);
+ client.handleException(e);
+ if (post != null) {
+ post.abort();
+ }
+ }
+ }
+
+ private RTMPTClientConnection openConnection() throws IOException {
+ RTMPTClientConnection connection = null;
+ HttpPost openPost = new HttpPost("/open/1");
+ setCommonHeaders(openPost);
+ openPost.setEntity(ZERO_REQUEST_ENTITY);
+ // execute
+ HttpResponse response = httpClient.execute(targetHost,
openPost);
+ checkResponseCode(response);
+ // get the response entity
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ String responseStr = EntityUtils.toString(entity);
+ clientId = Integer.parseInt(responseStr.substring(0,
responseStr.length() - 1));
+ log.debug("Got client id {}", clientId);
+ // create a new connection
+ connection = (RTMPTClientConnection)
connManager.createConnection(RTMPTClientConnection.class);
+ // client state
+ RTMP state = new RTMP();
+ connection.setState(state);
+ connection.setHandler(client);
+ connection.setDecoder(client.getDecoder());
+ connection.setEncoder(client.getEncoder());
+ log.debug("Handshake 1st phase");
+ IoBuffer handshake =
IoBuffer.allocate(Constants.HANDSHAKE_SIZE + 1);
+ handshake.put((byte) 0x03);
+ handshake.fill((byte) 0x01, Constants.HANDSHAKE_SIZE);
+ handshake.flip();
+ connection.writeRaw(handshake);
+ }
+ return connection;
+ }
+
+ private void finalizeConnection() throws IOException {
+ log.debug("Sending close post");
+ HttpPost closePost = new HttpPost(makeUrl("close"));
+ closePost.setEntity(ZERO_REQUEST_ENTITY);
+ HttpResponse response = httpClient.execute(targetHost,
closePost);
+ EntityUtils.consume(response.getEntity());
+ }
+
+ private HttpPost makePost(String command) {
+ HttpPost post = new HttpPost(makeUrl(command));
+ setCommonHeaders(post);
+ return post;
+ }
+
+ private String makeUrl(String command) {
+ // use message count from connection
+ return String.format("/%s/%s/%s", command, clientId,
messageCount++);
+ }
+
+ private void setCommonHeaders(HttpPost post) {
+ post.addHeader("Connection", "Keep-Alive");
+ post.addHeader("Cache-Control", "no-cache");
+ }
+
+ private void checkResponseCode(HttpResponse response) throws
ParseException, IOException {
+ int code = response.getStatusLine().getStatusCode();
+ if (code != HttpStatus.SC_OK) {
+ throw new RuntimeException("Bad HTTP status returned,
line: " + response.getStatusLine() + "; body: " +
EntityUtils.toString(response.getEntity()));
+ }
+ }
+
+ public void setStopRequested(boolean stopRequested) {
+ this.stopRequested = stopRequested;
+ }
+}
Modified: incubator/openmeetings/trunk/singlewebapp/build.xml
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/build.xml?rev=1351985&r1=1351984&r2=1351985&view=diff
==============================================================================
--- incubator/openmeetings/trunk/singlewebapp/build.xml (original)
+++ incubator/openmeetings/trunk/singlewebapp/build.xml Wed Jun 20 08:23:00 2012
@@ -34,7 +34,8 @@
<property name="mainlibs.lib.dir" value="${project.lib.dir}/mainlibs" />
<property name="om.lib.dir" value="${project.lib.dir}/om" />
<property name="anakia.lib.dir" value="${project.lib.dir}/anakia" />
- <property name="screensharing.resources"
value="${basedir}/WebContent/screensharing" />
+ <property name="webcontent.src.dir" value="${basedir}/WebContent" />
+ <property name="screensharing.resources"
value="${webcontent.src.dir}/screensharing" />
<property name="junit.lib.dir" value="${project.lib.dir}/junit" />
<property name="rat.lib.dir" value="${project.lib.dir}/rat" />
<property name="dtd-generator.lib.dir"
value="${project.lib.dir}/dtd-generator" />
@@ -58,7 +59,6 @@
<!-- LPS Properties -->
<property name="out.dir.swf" value="${basedir}/${dist.webapps.dir}/" />
- <property name="webcontent.src.dir" value="${basedir}/WebContent" />
<property name="laszlo.as3.src.dir" value="${webcontent.src.dir}/swf10"
/>
<property name="laszlo.src.dir" value="${webcontent.src.dir}/src" />
@@ -213,10 +213,10 @@
<!-- add language files from .war version -->
<copy todir="${dist.webapps.dir}/languages">
- <fileset dir="WebContent/languages" />
+ <fileset dir="${webcontent.src.dir}/languages" />
</copy>
<copy todir="${dist.webapps.dir}/conf">
- <fileset dir="WebContent/conf" />
+ <fileset dir="${webcontent.src.dir}/conf" />
</copy>
<tstamp />
</target>
@@ -555,7 +555,7 @@
</classpath>
</taskdef>
- <xslt in="WebContent/languages/errorvalues.xml"
out="${docs.src}/ErrorsTable.xml"
style="${docs.src}/stylesheets/errortable.xsl" />
+ <xslt in="${webcontent.src.dir}/languages/errorvalues.xml"
out="${docs.src}/ErrorsTable.xml"
style="${docs.src}/stylesheets/errortable.xsl" />
<anakia basedir="${docs.src}" destdir="${docs.dest}/"
extension=".html" style="./site.vsl" projectFile="stylesheets/project.xml"
excludes="**/stylesheets/** empty.xml" includes="**/*.xml"
lastModifiedCheck="true" templatePath="xdocs/stylesheets"
velocityPropertiesFile="${build.base.dir}/velocity.properties" />
<copy todir="${docs.dest}/images" filtering="no">
<fileset dir="${docs.src}/images">
@@ -777,7 +777,9 @@
<svn refid="svn.settings">
<checkout
url="http://red5.googlecode.com/svn/java/client/trunk/"
revision="${red5.client.revision}" destPath="${red5.client.dir}" />
</svn>
- <copy file="${red5.lib}/red5.jar"
todir="${red5.client.dir}/lib" />
+ <copy file="${red5.lib}/red5.jar"
todir="${red5.client.dir}/lib" overwrite="true" force="true" />
+ <copy
file="${webcontent.src.dir}/red5/client_rtmpt_patch/RTMPTClient.java"
todir="${red5.client.dir}/src/org/red5/client/net/rtmpt" overwrite="true"
force="true" />
+ <copy
file="${webcontent.src.dir}/red5/client_rtmpt_patch/RTMPTClientConnector.java"
todir="${red5.client.dir}/src/org/red5/client/net/rtmpt" overwrite="true"
force="true" />
<subant target="dist">
<fileset dir="${red5.client.dir}" includes="build.xml"
/>
</subant>