This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new 51d4710 [OPENMEETINGS-2493] recording is fixed, code is further improved 51d4710 is described below commit 51d471006506f7a7423eb5e8aa18c34c15e59132 Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Sat Oct 24 11:00:04 2020 +0700 [OPENMEETINGS-2493] recording is fixed, code is further improved --- .../openmeetings/core/remote/AbstractStream.java | 6 +- .../org/apache/openmeetings/core/remote/KRoom.java | 15 +-- .../apache/openmeetings/core/remote/KStream.java | 58 ++++++------ .../openmeetings/core/remote/KTestStream.java | 101 +++++++++++++-------- .../openmeetings/core/remote/KurentoHandler.java | 24 +++-- .../openmeetings/core/remote/StreamProcessor.java | 14 ++- .../core/remote/TestStreamProcessor.java | 24 +---- .../core/remote/TestRecordingFlowMocked.java | 4 +- .../core/remote/TestRoomFlowMocked.java | 2 - .../apache/openmeetings/web/app/TimerService.java | 2 +- .../apache/openmeetings/web/room/RoomPanel.html | 8 -- .../org/apache/openmeetings/web/room/raw-sharer.js | 10 +- .../openmeetings/web/room/raw-video-manager.js | 8 +- .../org/apache/openmeetings/web/room/raw-video.js | 17 +++- .../web/user/record/RecordingsPanel.java | 11 +-- 15 files changed, 158 insertions(+), 146 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java index c1376d2..6de2a46 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java @@ -41,11 +41,11 @@ public abstract class AbstractStream { return uid; } - public void release(IStreamProcessor processor) { - release(processor, true); + public void release() { + release(true); } - public abstract void release(IStreamProcessor processor, boolean remove); + public abstract void release(boolean remove); public static WebRtcEndpoint createWebRtcEndpoint(MediaPipeline pipeline) { return new WebRtcEndpoint.Builder(pipeline).build(); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 8215b37..433a74e 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -24,6 +24,7 @@ package org.apache.openmeetings.core.remote; import static java.util.UUID.randomUUID; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; +import static org.apache.openmeetings.db.util.ApplicationHelper.ensureApplication; import java.util.Date; import java.util.Optional; @@ -55,7 +56,6 @@ import com.github.openjson.JSONObject; * */ public class KRoom { - private static final Logger log = LoggerFactory.getLogger(KRoom.class); /** @@ -63,7 +63,6 @@ public class KRoom { */ private final StreamProcessor processor; private final RecordingChunkDao chunkDao; - private final IApplication app; private final Long roomId; private final Room.Type type; private final AtomicBoolean recordingStarted = new AtomicBoolean(false); @@ -75,7 +74,6 @@ public class KRoom { public KRoom(KurentoHandler handler, Room r) { this.processor = handler.getStreamProcessor(); this.chunkDao = handler.getChunkDao(); - this.app = handler.getApp(); this.roomId = r.getId(); this.type = r.getType(); log.info("ROOM {} has been created", roomId); @@ -125,6 +123,8 @@ public class KRoom { public void startRecording(Client c) { if (recordingStarted.compareAndSet(false, true)) { + IApplication app = ensureApplication(c.getUser().getLanguageId()); + log.debug("##REC:: recording in room {} is starting ::", roomId); Room r = c.getRoom(); boolean interview = Room.Type.INTERVIEW == r.getType(); @@ -164,9 +164,7 @@ public class KRoom { rec = processor.getRecordingDao().update(rec); // Receive recordingId recordingId = rec.getId(); - processor.getByRoom(this.getRoomId()).forEach( - stream -> stream.startRecord(processor) - ); + processor.getByRoom(this.getRoomId()).forEach(KStream::startRecord); // Send notification to all users that the recording has been started WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.RECORDING_TOGGLED)); @@ -224,7 +222,6 @@ public class KRoom { if (sharingStarted.compareAndSet(false, true)) { sharingUser.put("sid", c.getSid()); sd = c.addStream(StreamType.SCREEN, a); - sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height")); cm.update(c); log.debug("Stream.UID {}: sharing has been started, activity: {}", sd.getUid(), a); h.sendClient(sd.getSid(), newKurentoMsg() @@ -253,9 +250,7 @@ public class KRoom { } public void close() { - processor.getByRoom(this.getRoomId()).forEach( - stream -> stream.release(processor) - ); + processor.getByRoom(this.getRoomId()).forEach(KStream::release); log.debug("Room {} closed", this.roomId); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index 930d4ea..c79fd40 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -25,12 +25,15 @@ import static java.util.UUID.randomUUID; import static java.util.concurrent.CompletableFuture.delayedExecutor; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; +import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; +import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_STREAM_UID; import static org.apache.openmeetings.core.remote.KurentoHandler.getFlowoutTimeout; import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.getRecUri; import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk; import java.util.Date; +import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -53,6 +56,7 @@ import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.MediaType; import org.kurento.client.RecorderEndpoint; +import org.kurento.client.RtpEndpoint; import org.kurento.client.WebRtcEndpoint; import org.kurento.jsonrpc.JsonUtils; import org.slf4j.Logger; @@ -89,14 +93,9 @@ public class KStream extends AbstractStream { //TODO Min/Max Audio/Video RecvBandwidth } - public void startBroadcast( - final StreamProcessor processor - , final StreamDesc sd - , final String sdpOffer - , Runnable then) - { + public void startBroadcast(final StreamDesc sd, final String sdpOffer, Runnable then) { if (outgoingMedia != null) { - release(processor, false); + release(false); } hasAudio = sd.hasActivity(Activity.AUDIO); hasVideo = sd.hasActivity(Activity.VIDEO); @@ -131,10 +130,10 @@ public class KStream extends AbstractStream { profile = MediaProfileSpecType.WEBM_VIDEO_ONLY; break; } - pipeline = kHandler.createPipiline(room.getRoomId(), sd.getUid(), new Continuation<Void>() { + pipeline = kHandler.createPipiline(Map.of(TAG_ROOM, String.valueOf(room.getRoomId()), TAG_STREAM_UID, sd.getUid()), new Continuation<Void>() { @Override public void onSuccess(Void result) throws Exception { - internalStartBroadcast(processor, sd, sdpOffer); + internalStartBroadcast(sd, sdpOffer); then.run(); } @@ -145,12 +144,8 @@ public class KStream extends AbstractStream { }); } - private void internalStartBroadcast( - final StreamProcessor processor - , final StreamDesc sd - , final String sdpOffer) - { - outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid()); + private void internalStartBroadcast(final StreamDesc sd, final String sdpOffer) { + outgoingMedia = createEndpoint(sd.getSid(), sd.getUid()); outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd)); outgoingMedia.addMediaFlowOutStateChangeListener(evt -> { log.info("Media Flow STATE :: {}, type {}, evt {}", evt.getState(), evt.getType(), evt.getMediaType()); @@ -159,7 +154,7 @@ public class KStream extends AbstractStream { flowoutFuture = Optional.of(new CompletableFuture<>().completeAsync(() -> { log.warn("KStream will be dropped {}", sd); if (StreamType.SCREEN == streamType) { - processor.doStopSharing(sid, uid); + kHandler.getStreamProcessor().doStopSharing(sid, uid); } stopBroadcast(); return null; @@ -173,9 +168,9 @@ public class KStream extends AbstractStream { } }); outgoingMedia.addMediaFlowInStateChangeListener(evt -> log.warn("Media FlowIn :: {}", evt)); - addListener(processor, sd.getSid(), sd.getUid(), sdpOffer); + addListener(sd.getSid(), sd.getUid(), sdpOffer); if (room.isRecording()) { - startRecord(processor); + startRecord(); } Client c = sd.getClient(); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); @@ -187,7 +182,14 @@ public class KStream extends AbstractStream { } } - public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) { + private RtpEndpoint createRtpEndpoint(MediaPipeline pipeline) { + RtpEndpoint endpoint = new RtpEndpoint.Builder(pipeline).build(); + endpoint.addTag("outUid", this.uid); + endpoint.addTag("uid", uid); + return endpoint; + } + + public void addListener(String sid, String uid, String sdpOffer) { final boolean self = uid.equals(this.uid); log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", room.getRoomId()); log.trace("USER {}: SdpOffer is {}", uid, sdpOffer); @@ -196,7 +198,7 @@ public class KStream extends AbstractStream { return; } - final WebRtcEndpoint endpoint = getEndpointForUser(processor, sid, uid); + final WebRtcEndpoint endpoint = getEndpointForUser(sid, uid); final String sdpAnswer = endpoint.processOffer(sdpOffer); log.debug("gather candidates"); @@ -208,7 +210,7 @@ public class KStream extends AbstractStream { .put("sdpAnswer", sdpAnswer)); } - private WebRtcEndpoint getEndpointForUser(final StreamProcessor processor, String sid, String uid) { + private WebRtcEndpoint getEndpointForUser(String sid, String uid) { if (uid.equals(this.uid)) { log.debug("PARTICIPANT {}: configuring loopback", this.uid); return outgoingMedia; @@ -221,11 +223,11 @@ public class KStream extends AbstractStream { listener.release(); } log.debug("PARTICIPANT {}: creating new endpoint for {}", uid, this.uid); - listener = createEndpoint(processor, sid, uid); + listener = createEndpoint(sid, uid); listeners.put(uid, listener); log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, this.uid); - Client cur = processor.getBySid(this.sid); + Client cur = kHandler.getStreamProcessor().getBySid(this.sid); if (cur == null) { log.warn("Client for endpoint dooesn't exists"); } else { @@ -244,7 +246,7 @@ public class KStream extends AbstractStream { return listener; } - private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) { + private WebRtcEndpoint createEndpoint(String sid, String uid) { WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline); endpoint.addTag("outUid", this.uid); endpoint.addTag("uid", uid); @@ -258,10 +260,10 @@ public class KStream extends AbstractStream { return endpoint; } - public void startRecord(StreamProcessor processor) { + public void startRecord() { log.debug("startRecord outMedia OK ? {}", outgoingMedia != null); if (outgoingMedia == null) { - release(processor, true); + release(true); return; } final String chunkUid = "rec_" + room.getRecordingId() + "_" + randomUUID(); @@ -351,7 +353,7 @@ public class KStream extends AbstractStream { } @Override - public void release(IStreamProcessor processor, boolean remove) { + public void release(boolean remove) { if (outgoingMedia != null) { releaseListeners(); outgoingMedia.release(new Continuation<Void>() { @@ -380,7 +382,7 @@ public class KStream extends AbstractStream { outgoingMedia = null; } if (remove) { - processor.release(this, false); + kHandler.getStreamProcessor().release(this, false); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java index 0839693..6949528 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java @@ -19,7 +19,10 @@ package org.apache.openmeetings.core.remote; import static java.util.UUID.randomUUID; +import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; +import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE; +import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; import static org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM; @@ -27,6 +30,7 @@ import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX; import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir; import java.io.File; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -50,6 +54,8 @@ import com.github.openjson.JSONObject; public class KTestStream extends AbstractStream { private static final Logger log = LoggerFactory.getLogger(KTestStream.class); + private static final Map<String, String> TAGS = Map.of(TAG_MODE, MODE_TEST, TAG_ROOM, MODE_TEST); + private final KurentoHandler kHandler; private MediaPipeline pipeline; private WebRtcEndpoint webRtcEndpoint; private PlayerEndpoint player; @@ -59,9 +65,13 @@ public class KTestStream extends AbstractStream { private ScheduledFuture<?> recHandle; private int recTime; - public KTestStream(IWsClient c, JSONObject msg, MediaPipeline pipeline) { + public KTestStream(IWsClient c, JSONObject msg, KurentoHandler kHandler) { super(null, c.getUid()); - this.pipeline = pipeline; + this.kHandler = kHandler; + createPipeline(() -> startTestRecording(c, msg)); + } + + private void startTestRecording(IWsClient c, JSONObject msg) { webRtcEndpoint = createWebRtcEndpoint(pipeline); webRtcEndpoint.connect(webRtcEndpoint); @@ -122,34 +132,31 @@ public class KTestStream extends AbstractStream { }); } - public void play(final IWsClient inClient, JSONObject msg, MediaPipeline inPipeline) { - this.pipeline = inPipeline; - webRtcEndpoint = createWebRtcEndpoint(pipeline); - player = createPlayerEndpoint(pipeline, recPath); - player.connect(webRtcEndpoint); - webRtcEndpoint.addMediaSessionStartedListener(evt -> { - log.info("Media session started {}", evt); - player.addErrorListener(event -> { - log.info("ErrorEvent for player with uid '{}': {}", inClient.getUid(), event.getDescription()); - sendPlayEnd(inClient); - }); - player.addEndOfStreamListener(event -> { - log.info("EndOfStreamEvent for player with uid '{}'", inClient.getUid()); - sendPlayEnd(inClient); - }); - player.play(); - }); + public void play(final IWsClient inClient, JSONObject msg) { + createPipeline(() -> { + webRtcEndpoint = createWebRtcEndpoint(pipeline); + player = createPlayerEndpoint(pipeline, recPath); + player.connect(webRtcEndpoint); + webRtcEndpoint.addMediaSessionStartedListener(evt -> { + log.info("Media session started {}", evt); + player.addErrorListener(event -> { + log.info("ErrorEvent for player with uid '{}': {}", inClient.getUid(), event.getDescription()); + sendPlayEnd(inClient); + }); + player.addEndOfStreamListener(event -> { + log.info("EndOfStreamEvent for player with uid '{}'", inClient.getUid()); + sendPlayEnd(inClient); + }); + player.play(); + }); + addIceListener(inClient); - String sdpOffer = msg.getString("sdpOffer"); - String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer); - - addIceListener(inClient); - - WebSocketHelper.sendClient(inClient, newTestKurentoMsg() - .put("id", "playResponse") - .put("sdpAnswer", sdpAnswer)); + WebSocketHelper.sendClient(inClient, newTestKurentoMsg() + .put("id", "playResponse") + .put("sdpAnswer", webRtcEndpoint.processOffer(msg.getString("sdpOffer")))); - webRtcEndpoint.gatherCandidates(); + webRtcEndpoint.gatherCandidates(); + }); } public void addCandidate(IceCandidate cand) { @@ -158,6 +165,21 @@ public class KTestStream extends AbstractStream { } } + private void createPipeline(Runnable action) { + release(false); + this.pipeline = kHandler.createPipiline(TAGS, new Continuation<Void>() { + @Override + public void onSuccess(Void result) throws Exception { + action.run(); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.warn("Unable to create pipeline for test stream", cause); + } + }); + } + private void addIceListener(IWsClient inClient) { webRtcEndpoint.addIceCandidateFoundListener(evt -> { IceCandidate cand = evt.getCandidate(); @@ -192,6 +214,13 @@ public class KTestStream extends AbstractStream { recPath = OmFileHelper.getRecUri(f); } + private void releaseEndpoint() { + if (webRtcEndpoint != null) { + webRtcEndpoint.release(); + webRtcEndpoint = null; + } + } + private void releasePipeline() { if (pipeline != null) { pipeline.release(); @@ -200,29 +229,29 @@ public class KTestStream extends AbstractStream { } private void releaseRecorder() { - releasePipeline(); + releaseEndpoint(); if (recorder != null) { recorder.release(); recorder = null; } + releasePipeline(); } private void releasePlayer() { - releasePipeline(); + releaseEndpoint(); if (player != null) { player.release(); player = null; } + releasePipeline(); } @Override - public void release(IStreamProcessor processor, boolean remove) { - if (webRtcEndpoint != null) { - webRtcEndpoint.release(); - webRtcEndpoint = null; - } + public void release(boolean remove) { releasePlayer(); releaseRecorder(); - processor.release(this, true); + if (remove) { + kHandler.getTestProcessor().release(this, true); + } } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 21ff59e..7023063 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -41,7 +41,7 @@ import javax.annotation.PreDestroy; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; -import org.apache.openmeetings.IApplication; +import org.apache.openmeetings.core.sip.SipManager; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.dao.record.RecordingChunkDao; import org.apache.openmeetings.db.dao.room.RoomDao; @@ -65,6 +65,7 @@ import org.kurento.client.MediaPipeline; import org.kurento.client.ObjectCreatedEvent; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RecorderEndpoint; +import org.kurento.client.RtpEndpoint; import org.kurento.client.Tag; import org.kurento.client.Transaction; import org.kurento.client.WebRtcEndpoint; @@ -122,8 +123,6 @@ public class KurentoHandler { @Autowired private IClientManager cm; @Autowired - private IApplication app; - @Autowired private RoomDao roomDao; @Autowired private RecordingChunkDao chunkDao; @@ -131,6 +130,8 @@ public class KurentoHandler { private TestStreamProcessor testProcessor; @Autowired private StreamProcessor streamProcessor; + @Autowired + private SipManager sipManager; boolean isConnected() { boolean connctd = connected.get() && client != null && !client.isClosed(); @@ -298,12 +299,11 @@ public class KurentoHandler { streamProcessor.remove((Client)c); } - MediaPipeline createPipiline(Long roomId, String uid, Continuation<Void> continuation) { + MediaPipeline createPipiline(Map<String, String> tags, Continuation<Void> continuation) { Transaction t = beginTransaction(); MediaPipeline pipe = client.createMediaPipeline(t); pipe.addTag(t, TAG_KUID, kuid); - pipe.addTag(t, TAG_ROOM, String.valueOf(roomId)); - pipe.addTag(t, TAG_STREAM_UID, uid); + tags.forEach((key, value) -> pipe.addTag(t, key, value)); t.commit(continuation); return pipe; } @@ -396,14 +396,18 @@ public class KurentoHandler { return kuid; } - IApplication getApp() { - return app; + public TestStreamProcessor getTestProcessor() { + return testProcessor; } StreamProcessor getStreamProcessor() { return streamProcessor; } + SipManager getSipManager() { + return sipManager; + } + RecordingChunkDao getChunkDao() { return chunkDao; } @@ -460,7 +464,7 @@ public class KurentoHandler { { return; } else { - stream.release(streamProcessor); + stream.release(); } } } @@ -480,6 +484,8 @@ public class KurentoHandler { clazz = RecorderEndpoint.class; } else if (curPoint instanceof PlayerEndpoint) { clazz = PlayerEndpoint.class; + } else if (curPoint instanceof RtpEndpoint) { + clazz = RtpEndpoint.class; } final Class<? extends Endpoint> fClazz = clazz; scheduler.schedule(() -> { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 05e0a9b..4855d86 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -134,7 +134,7 @@ public class StreamProcessor implements IStreamProcessor { if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !sd.hasActivity(Activity.SCREEN)) { break; } - sender.addListener(this, c.getSid(), c.getUid(), msg.getString("sdpOffer")); + sender.addListener(c.getSid(), c.getUid(), msg.getString("sdpOffer")); } break; case "wannaShare": @@ -184,13 +184,17 @@ public class StreamProcessor implements IStreamProcessor { KRoom room = kHandler.getRoom(c.getRoomId()); sender = room.join(sd, kHandler); } + if (msg.has("width")) { + sd.setWidth(msg.getInt("width")).setHeight(msg.getInt("height")); + cm.update(c); + } startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> { if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) { startRecording(c); } }); } catch (KurentoServerException e) { - sender.release(this); + sender.release(); WebSocketHelper.sendClient(c, newStoppedMsg(sd)); sendError(c, "Failed to start broadcast: " + e.getMessage()); log.error("Failed to start broadcast", e); @@ -208,7 +212,7 @@ public class StreamProcessor implements IStreamProcessor { * @return the current KStream */ void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) { - stream.startBroadcast(this, sd, sdpOffer, then); + stream.startBroadcast(sd, sdpOffer, then); } private static boolean isBroadcasting(final Client c) { @@ -494,7 +498,7 @@ public class StreamProcessor implements IStreamProcessor { for (StreamDesc sd : c.getStreams()) { AbstractStream s = getByUid(sd.getUid()); if (s != null) { - s.release(this); + s.release(); WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newStoppedMsg(sd)); } } @@ -545,7 +549,7 @@ public class StreamProcessor implements IStreamProcessor { public void release(AbstractStream stream, boolean releaseStream) { final String uid = stream.getUid(); if (releaseStream) { - stream.release(this); + stream.release(); } Client c = cm.getBySid(stream.getSid()); if (c != null) { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java index c93c92b..499d772 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java @@ -22,9 +22,7 @@ package org.apache.openmeetings.core.remote; import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; -import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_KUID; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE; -import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; import java.util.Map; import java.util.Map.Entry; @@ -33,8 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.entity.basic.IWsClient; import org.kurento.client.IceCandidate; -import org.kurento.client.MediaPipeline; -import org.kurento.client.Transaction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -58,9 +54,9 @@ class TestStreamProcessor implements IStreamProcessor { break; case "record": if (user != null) { - user.release(this); + user.release(); } - user = new KTestStream(c, msg, createTestPipeline()); + user = new KTestStream(c, msg, kHandler); streamByUid.put(c.getUid(), user); break; case "iceCandidate": @@ -79,7 +75,7 @@ class TestStreamProcessor implements IStreamProcessor { break; case "play": if (user != null) { - user.play(c, msg, createTestPipeline()); + user.play(c, msg); } break; default: @@ -92,16 +88,6 @@ class TestStreamProcessor implements IStreamProcessor { return uid == null ? null : streamByUid.get(uid); } - private MediaPipeline createTestPipeline() { - Transaction t = kHandler.beginTransaction(); - MediaPipeline pipe = kHandler.getClient().createMediaPipeline(t); - pipe.addTag(t, TAG_KUID, kHandler.getKuid()); - pipe.addTag(t, TAG_MODE, MODE_TEST); - pipe.addTag(t, TAG_ROOM, MODE_TEST); - t.commit(); - return pipe; - } - static JSONObject newTestKurentoMsg() { return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST); } @@ -109,7 +95,7 @@ class TestStreamProcessor implements IStreamProcessor { void remove(IWsClient c) { AbstractStream s = getByUid(c.getUid()); if (s != null) { - s.release(this); + s.release(); } } @@ -121,7 +107,7 @@ class TestStreamProcessor implements IStreamProcessor { @Override public void destroy() { for (Entry<String, KTestStream> e : streamByUid.entrySet()) { - e.getValue().release(this); + e.getValue().release(); streamByUid.remove(e.getKey()); } streamByUid.clear(); diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java index 428ecf8..377d48b 100644 --- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java +++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java @@ -148,8 +148,6 @@ class TestRecordingFlowMocked extends BaseMockedTest { private void testStartRecordWhenSharingWasNot() throws Exception { JSONObject msg = new JSONObject(MSG_BASE.toString()) .put("id", "wannaRecord") - .put("width", 640) - .put("height", 480) .put("shareType", "shareType") .put("fps", "fps") ; @@ -172,6 +170,8 @@ class TestRecordingFlowMocked extends BaseMockedTest { .put("type", "kurento") .put("uid", streamDescUID) .put("sdpOffer", "SDP-OFFER") + .put("width", 640) + .put("height", 480) ; handler.onMessage(c, msgBroadcastStarted); diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java index 8c00ab3..d120f9b 100644 --- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java +++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java @@ -146,8 +146,6 @@ class TestRoomFlowMocked extends BaseMockedTest { runWrapped(() -> { JSONObject msg = new JSONObject(MSG_BASE.toString()) .put("id", "wannaRecord") - .put("width", 640) - .put("height", 480) .put("shareType", "shareType") .put("fps", "fps") ; diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java index 2a2800b..a8bd24b 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java @@ -123,7 +123,7 @@ public class TimerService { public void scheduleSipCheck(Room r) { // sip allowed and configured - if (sipManager.getSipUser(r) != null && !sipCheckMap.containsKey(r.getId())) { + if (sipManager.getSipUser(r) != null && r.isSipEnabled() && !sipCheckMap.containsKey(r.getId())) { doSipCheck(r.getId()); } } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.html b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.html index 17920c5..315953b 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.html +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.html @@ -117,14 +117,6 @@ </select> </div> <div class="row-no-gutters"> - <label class="col-7"><wicket:message key="740"/></label> - <input class="width col-4" type="number" value="800"/> - </div> - <div class="row-no-gutters"> - <label class="col-7"><wicket:message key="741"/></label> - <input class="height col-4" type="number" value="600"/> - </div> - <div class="row-no-gutters"> <label class="col-7"><wicket:message key="1089"/></label> <select name="fps" class="fps col-4 custom-select"> <option value="2">2 FPS</option> diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js index 3561a7a..a801caf 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-sharer.js @@ -4,7 +4,7 @@ var SHARE_STARTED = 'started'; var SHARE_STOPPED = 'stopped'; var Sharer = (function() { const self = {}; - let sharer, type, fps, sbtn, rbtn, width, height + let sharer, type, fps, sbtn, rbtn , shareState = SHARE_STOPPED, recState = SHARE_STOPPED; /** @@ -43,8 +43,6 @@ var Sharer = (function() { id: 'wannaShare' , shareType: type.val() , fps: fps.val() - , width: width.val() - , height: height.val() }); } else { VideoManager.sendMessage({ @@ -53,8 +51,6 @@ var Sharer = (function() { }); } }); - width = sharer.find('.width'); - height = sharer.find('.height'); rbtn = sharer.find('.record-start-stop').off(); if (Room.getOptions().allowRecording) { rbtn.show().click(function() { @@ -64,8 +60,6 @@ var Sharer = (function() { id: 'wannaRecord' , shareType: type.val() , fps: fps.val() - , width: width.val() - , height: height.val() }); } else { VideoManager.sendMessage({ @@ -96,8 +90,6 @@ var Sharer = (function() { , typeDis = _typeDisabled(); _disable(type, dis); _disable(fps, dis || typeDis); - _disable(width, dis); - _disable(height, dis); btn.find('span').text(btn.data(dis ? 'stop' : 'start')); if (dis) { btn.addClass('stop'); diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js index ad4e40f..491d5d0 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-manager.js @@ -6,7 +6,9 @@ var VideoManager = (function() { function _onVideoResponse(m) { const w = $('#' + VideoUtil.getVid(m.uid)) , v = w.data(); - v.processSdpAnswer(m.sdpAnswer); + if (v) { + v.processSdpAnswer(m.sdpAnswer); + } } function _onBroadcast(msg) { const sd = msg.stream @@ -61,7 +63,9 @@ var VideoManager = (function() { { const w = $('#' + VideoUtil.getVid(m.uid)) , v = w.data(); - v.processIceCandidate(m.candidate); + if (v) { + v.processIceCandidate(m.candidate); + } } break; case 'newStream': diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js index 215b114..01a480b 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js @@ -181,11 +181,18 @@ var Video = (function() { return OmUtil.error('Sender sdp offer error ' + genErr); } OmUtil.log('Invoking Sender SDP offer callback function'); - VideoManager.sendMessage({ - id : 'broadcastStarted' - , uid: sd.uid - , sdpOffer: offerSdp - }); + const bmsg = { + id : 'broadcastStarted' + , uid: sd.uid + , sdpOffer: offerSdp + }, vtracks = state.stream.getVideoTracks(); + if (vtracks && vtracks.length > 0) { + const vts = vtracks[0].getSettings(); + bmsg.width = vts.width; + bmsg.height = vts.height; + bmsg.fps = vts.frameRate; + } + VideoManager.sendMessage(bmsg); if (isSharing) { Sharer.setShareState(SHARE_STARTED); } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/user/record/RecordingsPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/user/record/RecordingsPanel.java index aec9bb7..1cd806e 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/user/record/RecordingsPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/user/record/RecordingsPanel.java @@ -23,7 +23,6 @@ import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk; import static org.apache.openmeetings.web.app.WebSession.getUserId; import java.util.List; -import java.util.stream.Collectors; import org.apache.openmeetings.core.converter.IRecordingConverter; import org.apache.openmeetings.core.converter.InterviewConverter; @@ -34,7 +33,6 @@ import org.apache.openmeetings.db.dto.record.RecordingContainerData; import org.apache.openmeetings.db.entity.file.BaseFileItem; import org.apache.openmeetings.db.entity.record.Recording; import org.apache.openmeetings.db.entity.record.Recording.Status; -import org.apache.openmeetings.db.entity.record.RecordingChunk; import org.apache.openmeetings.web.common.InvitationDialog; import org.apache.openmeetings.web.common.NameDialog; import org.apache.openmeetings.web.common.UserBasePanel; @@ -113,12 +111,11 @@ public class RecordingsPanel extends UserBasePanel { Recording r = (Recording)getLastSelected(); isInterview = r.isInterview(); - if (r.getOwnerId() != null && r.getOwnerId().equals(getUserId()) && r.getStatus() != Status.RECORDING && r.getStatus() != Status.CONVERTING) { - List<RecordingChunk> chunks = chunkDao.getByRecording(r.getId()) + if (r.getRoomId() != null && r.getOwnerId() != null && r.getOwnerId().equals(getUserId()) && r.getStatus() != Status.RECORDING && r.getStatus() != Status.CONVERTING) { + // will enable re-conversion if at least some of the chunks are OK + enabled = chunkDao.getByRecording(r.getId()) .stream() - .filter(chunk -> r.getRoomId() == null || !getRecordingChunk(r.getRoomId(), chunk.getStreamName()).exists()) - .collect(Collectors.toList()); - enabled = !chunks.isEmpty(); + .anyMatch(chunk -> getRecordingChunk(r.getRoomId(), chunk.getStreamName()).exists()); } } setEnabled(enabled);