This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new 6f6f03f [OPENMEETINGS-1649] test record/play seems to work 6f6f03f is described below commit 6f6f03ff24db05c4044093f1592e96622bfc84b0 Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Fri Oct 5 14:13:16 2018 +0700 [OPENMEETINGS-1649] test record/play seems to work --- .../openmeetings/core/remote/KTestStream.java | 48 +++--- .../openmeetings/core/remote/KurentoHandler.java | 191 +++++++++++---------- .../apache/openmeetings/web/room/raw-settings.js | 18 +- .../apache/openmeetings/web/room/raw-video-util.js | 6 + 4 files changed, 139 insertions(+), 124 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java index ab034ed..4e23dd5 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java @@ -25,6 +25,7 @@ import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir; import java.io.File; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -40,6 +41,7 @@ import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidateFoundEvent; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; +import org.kurento.client.MediaSessionStartedEvent; import org.kurento.client.MediaType; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RecorderEndpoint; @@ -68,8 +70,7 @@ public class KTestStream implements IKStream { webRtcEndpoint.connect(webRtcEndpoint); MediaProfileSpecType profile = getProfile(msg); - //FIXME TODO generated file uid - initRecPath(_c.getUid()); + initRecPath(); recorder = new RecorderEndpoint.Builder(pipeline, recPath) .stopOnEndOfStream() .withMediaProfile(profile).build(); @@ -110,11 +111,9 @@ public class KTestStream implements IKStream { break; } - // 3. SDP negotiation String sdpOffer = msg.getString("sdpOffer"); String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer); - // 4. Gather ICE candidates addIceListener(_c); WebSocketHelper.sendClient(_c, newTestKurentoMsg() @@ -136,38 +135,37 @@ public class KTestStream implements IKStream { } public void play(final IWsClient _c, JSONObject msg, MediaPipeline pipeline) { - // 1. Media logic this.pipeline = pipeline; webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build(); player = new PlayerEndpoint.Builder(pipeline, recPath).build(); player.connect(webRtcEndpoint); - - // Player listeners - player.addErrorListener(new EventListener<ErrorEvent>() { + webRtcEndpoint.addMediaSessionStartedListener(new EventListener<MediaSessionStartedEvent>() { @Override - public void onEvent(ErrorEvent event) { - log.info("ErrorEvent for player with uid '{}': {}", _c.getUid(), event.getDescription()); - sendPlayEnd(_c); - } - }); - player.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() { - @Override - public void onEvent(EndOfStreamEvent event) { - log.info("EndOfStreamEvent for player with uid '{}'", _c.getUid()); - sendPlayEnd(_c); + public void onEvent(MediaSessionStartedEvent event) { + log.info("Media session started {}", event); + player.addErrorListener(new EventListener<ErrorEvent>() { + @Override + public void onEvent(ErrorEvent event) { + log.info("ErrorEvent for player with uid '{}': {}", _c.getUid(), event.getDescription()); + sendPlayEnd(_c); + } + }); + player.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() { + @Override + public void onEvent(EndOfStreamEvent event) { + log.info("EndOfStreamEvent for player with uid '{}'", _c.getUid()); + sendPlayEnd(_c); + } + }); + player.play(); } }); - // 3. SDP negotiation String sdpOffer = msg.getString("sdpOffer"); String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer); - // 4. Gather ICE candidates addIceListener(_c); - // 5. Play recorded stream - player.play(); - WebSocketHelper.sendClient(_c, newTestKurentoMsg() .put("id", "playResponse") .put("sdpAnswer", sdpAnswer)); @@ -213,9 +211,9 @@ public class KTestStream implements IKStream { } } - private void initRecPath(String uid) { + private void initRecPath() { try { - File f = new File(getStreamsDir(), String.format("%s%s.webm", TEST_SETUP_PREFIX, uid)); + File f = new File(getStreamsDir(), String.format("%s%s.webm", TEST_SETUP_PREFIX, UUID.randomUUID())); recPath = String.format("file://%s", f.getCanonicalPath()); log.info("Configured to record to {}", recPath); } catch (IOException e) { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 6e52c7b..12fac99 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -55,6 +55,7 @@ import org.kurento.client.ObjectCreatedEvent; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RecorderEndpoint; import org.kurento.client.Tag; +import org.kurento.client.Transaction; import org.kurento.client.WebRtcEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,10 +66,10 @@ import com.github.openjson.JSONObject; public class KurentoHandler { private final static Logger log = LoggerFactory.getLogger(KurentoHandler.class); - private final static String MODE_TEST = "test"; - private final static String TAG_KUID = "kuid"; - private final static String TAG_MODE = "mode"; - private final static String TAG_ROOM = "roomId"; + final static String MODE_TEST = "test"; + final static String TAG_KUID = "kuid"; + final static String TAG_MODE = "mode"; + final static String TAG_ROOM = "roomId"; private final static String HMAC_SHA1_ALGORITHM = "HmacSHA1"; public final static String KURENTO_TYPE = "kurento"; private long checkTimeout = 200; //ms @@ -80,7 +81,6 @@ public class KurentoHandler { private int turnTtl = 60; //minutes private KurentoClient client; private String kuid; - private ScheduledExecutorService scheduler; private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>(); final Map<String, KStream> usersByUid = new ConcurrentHashMap<>(); final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>(); @@ -92,74 +92,8 @@ public class KurentoHandler { try { // TODO check connection, reconnect, listeners etc. client = KurentoClient.create(kurentoWsUrl); - client.getServerManager().addObjectCreatedListener(new EventListener<ObjectCreatedEvent>() { - @Override - public void onEvent(ObjectCreatedEvent evt) { - log.debug("Kurento::ObjectCreated -> {}", evt.getObject()); - if (evt.getObject() instanceof MediaPipeline) { - // room created - final String roid = evt.getObject().getId(); - scheduler.schedule(() -> { - if (client == null) { - return; - } - // still alive - MediaPipeline pipe = client.getById(roid, MediaPipeline.class); - try { - Map<String, String> tags = tagsAsMap(pipe); - if (validTestPipeline(tags)) { - return; - } - if (kuid.equals(tags.get(TAG_KUID))) { - KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM))); - if (r.getPipelineId().equals(pipe.getId())) { - return; - } else if (r != null) { - rooms.remove(r.getRoomId()); - r.close(); - } - } - } catch(Exception e) { - //no-op, connect will be dropped - } - log.warn("Invalid MediaPipeline {} detected, will be dropped", pipe.getId()); - pipe.release(); - }, checkTimeout, MILLISECONDS); - } else if (evt.getObject() instanceof Endpoint) { - // endpoint created - Endpoint _point = (Endpoint)evt.getObject(); - final String eoid = _point.getId(); - Class<? extends Endpoint> _clazz = null; - if (_point instanceof WebRtcEndpoint) { - _clazz = WebRtcEndpoint.class; - } else if (_point instanceof RecorderEndpoint) { - _clazz = RecorderEndpoint.class; - } else if (_point instanceof PlayerEndpoint) { - _clazz = PlayerEndpoint.class; - } - final Class<? extends Endpoint> clazz = _clazz; - scheduler.schedule(() -> { - if (client == null || clazz == null) { - return; - } - // still alive - Endpoint point = client.getById(eoid, clazz); - if (validTestPipeline(point.getMediaPipeline())) { - return; - } - Map<String, String> tags = tagsAsMap(point); - KStream stream = getByUid(tags.get("suid")); - if (stream != null && stream.contains(tags.get("uid"))) { - return; - } - log.warn("Invalid Endpoint {} detected, will be dropped", point.getId()); - point.release(); - }, checkTimeout, MILLISECONDS); - } - } - }); kuid = UUID.randomUUID().toString(); //FIXME TODO regenerate on re-connect - scheduler = Executors.newScheduledThreadPool(10); + client.getServerManager().addObjectCreatedListener(new KWatchDog(client, kuid, checkTimeout)); } catch (Exception e) { log.error("Fail to create Kurento client", e); } @@ -180,7 +114,7 @@ public class KurentoHandler { } } - private static Map<String, String> tagsAsMap(MediaObject pipe) { + static Map<String, String> tagsAsMap(MediaObject pipe) { Map<String, String> map = new HashMap<>(); for (Tag t : pipe.getTags()) { map.put(t.getKey(), t.getValue()); @@ -188,19 +122,13 @@ public class KurentoHandler { return map; } - private boolean validTestPipeline(MediaPipeline pipeline) { - return validTestPipeline(tagsAsMap(pipeline)); - } - - private boolean validTestPipeline(Map<String, String> tags) { - return kuid.equals(tags.get(TAG_KUID)) && MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_ROOM)); - } - private MediaPipeline createTestPipeline() { - MediaPipeline pipe = client.createMediaPipeline(); - pipe.addTag(TAG_KUID, kuid); - pipe.addTag(TAG_MODE, MODE_TEST); - pipe.addTag(TAG_ROOM, MODE_TEST); + Transaction t = client.beginTransaction(); + MediaPipeline pipe = client.createMediaPipeline(t); + pipe.addTag(t, TAG_KUID, kuid); + pipe.addTag(t, TAG_MODE, MODE_TEST); + pipe.addTag(t, TAG_ROOM, MODE_TEST); + t.commit(); return pipe; } @@ -216,7 +144,7 @@ public class KurentoHandler { case "wannaRecord": WebSocketHelper.sendClient(_c, newTestKurentoMsg() .put("id", "canRecord") - .put("configuration", new JSONObject().put("iceServers", getTurnServers(true))) + .put("iceServers", getTurnServers(true)) ); break; case "record": @@ -239,7 +167,7 @@ public class KurentoHandler { case "wannaPlay": WebSocketHelper.sendClient(_c, newTestKurentoMsg() .put("id", "canPlay") - .put("configuration", new JSONObject().put("iceServers", getTurnServers(true))) + .put("iceServers", getTurnServers(true)) ); break; case "play": @@ -464,7 +392,7 @@ public class KurentoHandler { Mac mac = Mac.getInstance(HMAC_SHA1_ALGORITHM); mac.init(new SecretKeySpec(turnSecret.getBytes(), HMAC_SHA1_ALGORITHM)); StringBuilder user = new StringBuilder() - .append((test ? 30 : turnTtl) + System.currentTimeMillis() / 1000L); + .append((test ? 60 : turnTtl * 60) + System.currentTimeMillis() / 1000L); if (!Strings.isEmpty(turnUser)) { user.append(':').append(turnUser); } @@ -510,4 +438,91 @@ public class KurentoHandler { public void setTurnTtl(int turnTtl) { this.turnTtl = turnTtl; } + + private class KWatchDog implements EventListener<ObjectCreatedEvent> { + private ScheduledExecutorService scheduler; + private final String kuid; + private final long checkTimeout; + private final KurentoClient client; + + public KWatchDog(final KurentoClient client, final String kuid, final long checkTimeout) { + this.client = client; + this.kuid = kuid; + this.checkTimeout = checkTimeout; + scheduler = Executors.newScheduledThreadPool(10); + } + + @Override + public void onEvent(ObjectCreatedEvent evt) { + log.debug("Kurento::ObjectCreated -> {}", evt.getObject()); + if (evt.getObject() instanceof MediaPipeline) { + // room created + final String roid = evt.getObject().getId(); + scheduler.schedule(() -> { + if (client == null) { + return; + } + // still alive + MediaPipeline pipe = client.getById(roid, MediaPipeline.class); + Map<String, String> tags = tagsAsMap(pipe); + try { + if (validTestPipeline(tags)) { + return; + } + if (kuid.equals(tags.get(TAG_KUID))) { + KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM))); + if (r.getPipelineId().equals(pipe.getId())) { + return; + } else if (r != null) { + rooms.remove(r.getRoomId()); + r.close(); + } + } + } catch(Exception e) { + //no-op, connect will be dropped + } + log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); + pipe.release(); + }, checkTimeout, MILLISECONDS); + } else if (evt.getObject() instanceof Endpoint) { + // endpoint created + Endpoint _point = (Endpoint)evt.getObject(); + final String eoid = _point.getId(); + Class<? extends Endpoint> _clazz = null; + if (_point instanceof WebRtcEndpoint) { + _clazz = WebRtcEndpoint.class; + } else if (_point instanceof RecorderEndpoint) { + _clazz = RecorderEndpoint.class; + } else if (_point instanceof PlayerEndpoint) { + _clazz = PlayerEndpoint.class; + } + final Class<? extends Endpoint> clazz = _clazz; + scheduler.schedule(() -> { + if (client == null || clazz == null) { + return; + } + // still alive + Endpoint point = client.getById(eoid, clazz); + if (validTestPipeline(point.getMediaPipeline())) { + return; + } + Map<String, String> tags = tagsAsMap(point); + KStream stream = getByUid(tags.get("suid")); + if (stream != null && stream.contains(tags.get("uid"))) { + return; + } + log.warn("Invalid Endpoint {} detected, will be dropped", point.getId()); + point.release(); + }, checkTimeout, MILLISECONDS); + } + } + + private boolean validTestPipeline(MediaPipeline pipeline) { + return validTestPipeline(tagsAsMap(pipeline)); + } + + private boolean validTestPipeline(Map<String, String> tags) { + return kuid.equals(tags.get(TAG_KUID)) && MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_ROOM)); + } + } } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js index 4292a20..89d0111 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js @@ -271,13 +271,10 @@ var VideoSettings = (function() { _clear(); const cnts = _constraints(); if (cnts.video !== false || cnts.audio !== false) { - const options = { + const options = VideoUtil.addIceServers({ localVideo: vid[0] , mediaConstraints: cnts - }; - if (msg && msg.configuration) { - options.configuration = msg.configuration; - } + }, msg); rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly( options , function(error) { @@ -298,7 +295,9 @@ var VideoSettings = (function() { }); }); } - _updateRec(); + if (!msg) { + _updateRec(); + } } function _allowRec(allow) { @@ -422,14 +421,11 @@ var VideoSettings = (function() { break; case 'canPlay': { - const options = { + const options = VideoUtil.addIceServers({ remoteVideo: vid[0] , mediaConstraints: {audio: true, video: true} , onicecandidate: _onIceCandidate - }; - if (m && m.configuration) { - options.configuration = m.configuration; - } + }, m); _clear(); rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly( options diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js index 44742b6..18b740f 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js @@ -135,5 +135,11 @@ var VideoUtil = (function() { self.arrange = _arrange; self.cleanStream = _cleanStream; self.cleanPeer = _cleanPeer; + self.addIceServers = function(opts, m) { + if (m && m.iceServers && m.iceServers.length > 0) { + opts.configuration = {iceServers: m.iceServers}; + } + return opts; + }; return self; })();