This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new 693bf74 [OPENMEETINGS-2492] pipeline is created per KStream 693bf74 is described below commit 693bf7496a9c6e1dbcc5ba32e736dc09b1c9f321 Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Fri Oct 23 14:39:12 2020 +0700 [OPENMEETINGS-2492] pipeline is created per KStream --- .../org/apache/openmeetings/core/remote/KRoom.java | 30 +------- .../apache/openmeetings/core/remote/KStream.java | 69 ++++++++++++++--- .../openmeetings/core/remote/KurentoHandler.java | 88 +++++++++++++--------- .../openmeetings/core/remote/StreamProcessor.java | 25 +++--- .../apache/openmeetings/core/sip/SipManager.java | 2 +- .../core/remote/TestRecordingFlowMocked.java | 4 +- .../apache/openmeetings/web/app/ClientManager.java | 8 +- .../apache/openmeetings/web/app/TimerService.java | 2 +- .../apache/openmeetings/web/room/RoomPanel.java | 6 +- .../org/apache/openmeetings/web/room/raw-video.js | 33 ++++---- .../openmeetings/web/app/TestApplication.java | 2 +- 11 files changed, 154 insertions(+), 115 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index c7a8709..8215b37 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -25,7 +25,6 @@ import static java.util.UUID.randomUUID; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; -import java.util.Collection; import java.util.Date; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,8 +45,6 @@ import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.FormatHelper; import org.apache.openmeetings.db.util.ws.RoomMessage; import org.apache.openmeetings.db.util.ws.TextRoomMessage; -import org.kurento.client.Continuation; -import org.kurento.client.MediaPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +64,6 @@ public class KRoom { private final StreamProcessor processor; private final RecordingChunkDao chunkDao; private final IApplication app; - private final MediaPipeline pipeline; private final Long roomId; private final Room.Type type; private final AtomicBoolean recordingStarted = new AtomicBoolean(false); @@ -76,13 +72,12 @@ public class KRoom { private JSONObject recordingUser = new JSONObject(); private JSONObject sharingUser = new JSONObject(); - public KRoom(KurentoHandler handler, Room r, MediaPipeline pipeline) { + public KRoom(KurentoHandler handler, Room r) { this.processor = handler.getStreamProcessor(); this.chunkDao = handler.getChunkDao(); this.app = handler.getApp(); this.roomId = r.getId(); this.type = r.getType(); - this.pipeline = pipeline; log.info("ROOM {} has been created", roomId); } @@ -98,25 +93,17 @@ public class KRoom { return recordingId; } - public MediaPipeline getPipeline() { - return pipeline; - } - public RecordingChunkDao getChunkDao() { return chunkDao; } - public KStream join(final StreamDesc sd) { + public KStream join(final StreamDesc sd, KurentoHandler kHandler) { log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid()); - final KStream stream = new KStream(sd, this); + final KStream stream = new KStream(sd, this, kHandler); processor.addStream(stream); return stream; } - public Collection<KStream> getParticipants() { - return processor.getByRoom(this.getRoomId()); - } - public void onStopBroadcast(KStream stream) { processor.release(stream, true); WebSocketHelper.sendAll(newKurentoMsg() @@ -269,17 +256,6 @@ public class KRoom { processor.getByRoom(this.getRoomId()).forEach( stream -> stream.release(processor) ); - pipeline.release(new Continuation<Void>() { - @Override - public void onSuccess(Void result) throws Exception { - log.trace("ROOM {}: Released Pipeline", KRoom.this.roomId); - } - - @Override - public void onError(Throwable cause) throws Exception { - log.warn("PARTICIPANT {}: Could not release Pipeline", KRoom.this.roomId); - } - }); log.debug("Room {} closed", this.roomId); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index 754ee15..930d4ea 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -49,6 +49,7 @@ import org.apache.openmeetings.db.util.ws.TextRoomMessage; import org.kurento.client.Continuation; import org.kurento.client.IceCandidate; import org.kurento.client.MediaFlowState; +import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.MediaType; import org.kurento.client.RecorderEndpoint; @@ -62,38 +63,49 @@ import com.github.openjson.JSONObject; public class KStream extends AbstractStream { private static final Logger log = LoggerFactory.getLogger(KStream.class); + private final KurentoHandler kHandler; private final KRoom room; private final Date connectedSince; private final StreamType streamType; private MediaProfileSpecType profile; + private MediaPipeline pipeline; private RecorderEndpoint recorder; private WebRtcEndpoint outgoingMedia = null; private final ConcurrentMap<String, WebRtcEndpoint> listeners = new ConcurrentHashMap<>(); private Optional<CompletableFuture<Object>> flowoutFuture = Optional.empty(); private Long chunkId; private Type type; + private boolean hasAudio; + private boolean hasVideo; + private boolean hasScreen; - public KStream(final StreamDesc sd, KRoom room) { + public KStream(final StreamDesc sd, KRoom room, KurentoHandler kHandler) { super(sd.getSid(), sd.getUid()); this.room = room; streamType = sd.getType(); this.connectedSince = new Date(); + this.kHandler = kHandler; //TODO Min/MaxVideoSendBandwidth //TODO Min/Max Audio/Video RecvBandwidth } - public KStream startBroadcast(final StreamProcessor processor, final StreamDesc sd, final String sdpOffer) { + public void startBroadcast( + final StreamProcessor processor + , final StreamDesc sd + , final String sdpOffer + , Runnable then) + { if (outgoingMedia != null) { release(processor, false); } - final boolean hasAudio = sd.hasActivity(Activity.AUDIO); - final boolean hasVideo = sd.hasActivity(Activity.VIDEO); - final boolean hasScreen = sd.hasActivity(Activity.SCREEN); + hasAudio = sd.hasActivity(Activity.AUDIO); + hasVideo = sd.hasActivity(Activity.VIDEO); + hasScreen = sd.hasActivity(Activity.SCREEN); if ((sdpOffer.indexOf("m=audio") > -1 && !hasAudio) || (sdpOffer.indexOf("m=video") > -1 && !hasVideo && StreamType.SCREEN != streamType)) { log.warn("Broadcast started without enough rights"); - return this; + return; } if (StreamType.SCREEN == streamType) { type = Type.SCREEN; @@ -119,6 +131,25 @@ public class KStream extends AbstractStream { profile = MediaProfileSpecType.WEBM_VIDEO_ONLY; break; } + pipeline = kHandler.createPipiline(room.getRoomId(), sd.getUid(), new Continuation<Void>() { + @Override + public void onSuccess(Void result) throws Exception { + internalStartBroadcast(processor, sd, sdpOffer); + then.run(); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.warn("Unable to create pipeline {}", KStream.this.uid, cause); + } + }); + } + + private void internalStartBroadcast( + final StreamProcessor processor + , final StreamDesc sd + , final String sdpOffer) + { outgoingMedia = createEndpoint(processor, sd.getSid(), sd.getUid()); outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd)); outgoingMedia.addMediaFlowOutStateChangeListener(evt -> { @@ -151,10 +182,9 @@ public class KStream extends AbstractStream { if (hasAudio || hasVideo || hasScreen) { WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg() .put("id", "newStream") - .put(PARAM_ICE, processor.getHandler().getTurnServers(c)) + .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); } - return this; } public void addListener(final StreamProcessor processor, String sid, String uid, String sdpOffer) { @@ -172,7 +202,7 @@ public class KStream extends AbstractStream { log.debug("gather candidates"); endpoint.gatherCandidates(); // this one might throw Exception log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer); - processor.getHandler().sendClient(sid, newKurentoMsg() + kHandler.sendClient(sid, newKurentoMsg() .put("id", "videoResponse") .put("uid", this.uid) .put("sdpAnswer", sdpAnswer)); @@ -215,11 +245,11 @@ public class KStream extends AbstractStream { } private WebRtcEndpoint createEndpoint(final StreamProcessor processor, String sid, String uid) { - WebRtcEndpoint endpoint = createWebRtcEndpoint(room.getPipeline()); + WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline); endpoint.addTag("outUid", this.uid); endpoint.addTag("uid", uid); - endpoint.addIceCandidateFoundListener(evt -> processor.getHandler().sendClient(sid + endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid , newKurentoMsg() .put("id", "iceCandidate") .put("uid", KStream.this.uid) @@ -235,7 +265,7 @@ public class KStream extends AbstractStream { return; } final String chunkUid = "rec_" + room.getRecordingId() + "_" + randomUUID(); - recorder = createRecorderEndpoint(room.getPipeline(), getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile); + recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile); recorder.addTag("outUid", uid); recorder.addTag("uid", uid); @@ -335,6 +365,17 @@ public class KStream extends AbstractStream { log.warn("PARTICIPANT {}: Could not release", KStream.this.uid, cause); } }); + pipeline.release(new Continuation<Void>() { + @Override + public void onSuccess(Void result) throws Exception { + log.trace("PARTICIPANT {}: Released Pipeline", KStream.this.uid); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.warn("PARTICIPANT {}: Could not release Pipeline", KStream.this.uid, cause); + } + }); releaseRecorder(false); outgoingMedia = null; } @@ -423,6 +464,10 @@ public class KStream extends AbstractStream { return room; } + MediaPipeline getPipeline() { + return pipeline; + } + public StreamType getStreamType() { return streamType; } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 5f1214e..a43f62c 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -56,6 +56,7 @@ import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.ws.RoomMessage; import org.apache.openmeetings.db.util.ws.TextRoomMessage; import org.apache.wicket.util.string.Strings; +import org.kurento.client.Continuation; import org.kurento.client.Endpoint; import org.kurento.client.EventListener; import org.kurento.client.KurentoClient; @@ -87,6 +88,7 @@ public class KurentoHandler { public static final String TAG_KUID = "kuid"; public static final String TAG_MODE = "mode"; public static final String TAG_ROOM = "roomId"; + public static final String TAG_STREAM_UID = "streamUid"; private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1"; private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1); public static final String KURENTO_TYPE = "kurento"; @@ -296,19 +298,22 @@ public class KurentoHandler { streamProcessor.remove((Client)c); } + MediaPipeline createPipiline(Long roomId, String uid, Continuation<Void> continuation) { + Transaction t = beginTransaction(); + MediaPipeline pipe = client.createMediaPipeline(t); + pipe.addTag(t, TAG_KUID, kuid); + pipe.addTag(t, TAG_ROOM, String.valueOf(roomId)); + pipe.addTag(t, TAG_STREAM_UID, uid); + t.commit(continuation); + return pipe; + } + KRoom getRoom(Long roomId) { - log.debug("Searching for room {}", roomId); KRoom room = rooms.computeIfAbsent(roomId, k -> { log.debug("Room {} does not exist. Will create now!", roomId); Room r = roomDao.get(roomId); - Transaction t = beginTransaction(); - MediaPipeline pipe = client.createMediaPipeline(t); - pipe.addTag(t, TAG_KUID, kuid); - pipe.addTag(t, TAG_ROOM, String.valueOf(roomId)); - t.commit(); - return new KRoom(this, r, pipe); + return new KRoom(this, r); }); - log.debug("Room {} found!", roomId); return room; } @@ -440,24 +445,30 @@ public class KurentoHandler { // still alive MediaPipeline pipe = client.getById(roid, MediaPipeline.class); Map<String, String> tags = tagsAsMap(pipe); - final String inKuid = tags.get(TAG_KUID); - if (ignoredKuids.contains(inKuid)) { - return; - } - if (validTestPipeline(tags)) { - return; - } - if (kuid.equals(inKuid)) { - KRoom r = rooms.get(Long.valueOf(tags.get(TAG_ROOM))); - if (r.getPipeline().getId().equals(pipe.getId())) { + try { + final String inKuid = tags.get(TAG_KUID); + if (inKuid != null && ignoredKuids.contains(inKuid)) { return; - } else if (r != null) { - rooms.remove(r.getRoomId()); - r.close(); } + if (validTestPipeline(tags)) { + return; + } + if (kuid.equals(inKuid)) { + KStream stream = streamProcessor.getByUid(tags.get(TAG_STREAM_UID)); + if (stream != null) { + if (stream.getRoom().getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM))) + && stream.getPipeline().getId().equals(pipe.getId())) + { + return; + } else { + stream.release(streamProcessor); + } + } + } + } catch (Throwable e) { + log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); + pipe.release(); } - log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); - pipe.release(); }, objCheckTimeout, MILLISECONDS); } else if (evt.getObject() instanceof Endpoint) { // endpoint created @@ -478,22 +489,25 @@ public class KurentoHandler { } // still alive Endpoint point = client.getById(eoid, fClazz); - Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline()); - final String inKuid = pipeTags.get(TAG_KUID); - if (ignoredKuids.contains(inKuid)) { - return; - } - if (validTestPipeline(pipeTags)) { - return; - } Map<String, String> tags = tagsAsMap(point); - KStream stream = streamProcessor.getByUid(tags.get("outUid")); - log.debug("New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream); - if (stream != null && stream.contains(tags.get("uid"))) { - return; + try { + Map<String, String> pipeTags = tagsAsMap(point.getMediaPipeline()); + final String inKuid = pipeTags.get(TAG_KUID); + if (ignoredKuids.contains(inKuid)) { + return; + } + if (validTestPipeline(pipeTags)) { + return; + } + KStream stream = streamProcessor.getByUid(tags.get("outUid")); + log.debug("Kurento::ObjectCreated -> New Endpoint {} detected, tags: {}, kStream: {}", point.getId(), tags, stream); + if (stream != null && stream.contains(tags.get("uid"))) { + return; + } + } catch (Throwable e) { + log.warn("Kurento::ObjectCreated -> Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags); + point.release(); } - log.warn("Invalid Endpoint {} detected, will be dropped, tags: {}", point.getId(), tags); - point.release(); }, objCheckTimeout, MILLISECONDS); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 27fbe41..5e4635a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.openmeetings.core.converter.IRecordingConverter; import org.apache.openmeetings.core.converter.InterviewConverter; @@ -182,12 +182,13 @@ public class StreamProcessor implements IStreamProcessor { try { if (sender == null) { KRoom room = kHandler.getRoom(c.getRoomId()); - sender = room.join(sd); - } - startBroadcast(sender, sd, msg.getString("sdpOffer")); - if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) { - startRecording(c); + sender = room.join(sd, kHandler); } + startBroadcast(sender, sd, msg.getString("sdpOffer"), () -> { + if (StreamType.SCREEN == sd.getType() && sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) { + startRecording(c); + } + }); } catch (KurentoServerException e) { sender.release(this); WebSocketHelper.sendClient(c, newStoppedMsg(sd)); @@ -203,10 +204,11 @@ public class StreamProcessor implements IStreamProcessor { * @param stream Stream to start * @param sd StreamDesc to start * @param sdpOffer the sdpOffer + * @param then steps need to be done after broadcast is started * @return the current KStream */ - KStream startBroadcast(KStream stream, StreamDesc sd, String sdpOffer) { - return stream.startBroadcast(this, sd, sdpOffer); + void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) { + stream.startBroadcast(this, sd, sdpOffer, then); } private static boolean isBroadcasting(final Client c) { @@ -501,7 +503,7 @@ public class StreamProcessor implements IStreamProcessor { } } if (c.getRoomId() != null) { - getByRoom(c.getRoomId()).stream().forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up + getByRoom(c.getRoomId()).forEach(stream -> stream.remove(c)); // listeners of existing streams should be cleaned-up checkStreams(c.getRoomId()); } } @@ -514,10 +516,9 @@ public class StreamProcessor implements IStreamProcessor { return streamByUid.values(); } - Collection<KStream> getByRoom(Long roomId) { + Stream<KStream> getByRoom(Long roomId) { return streamByUid.values().stream() - .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId)) - .collect(Collectors.toList()); + .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId)); } Client getBySid(String sid) { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java index 4e0f991..715718a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java @@ -280,7 +280,7 @@ public class SipManager implements ISipManager, SipListenerExt { ConfbridgeListAction da = new ConfbridgeListAction(confno); ResponseEvents r = execEvent(da); if (r != null) { - log.debug("SipManager::countUsers size == {}", r.getEvents().size()); + log.trace("SipManager::countUsers size == {}", r.getEvents().size()); // "- 1" here means: ListComplete event return r.getEvents().size() - 1; } diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java index cf24bc3..c5d5b1e 100644 --- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java +++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java @@ -91,7 +91,7 @@ class TestRecordingFlowMocked extends BaseMockedTest { doReturn(c.getRoom()).when(roomDao).get(ROOM_ID); // Mock out the methods that do webRTC - doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any()); + doReturn(null).when(streamProcessor).startBroadcast(any(), any(), any(), any()); } @@ -175,7 +175,7 @@ class TestRecordingFlowMocked extends BaseMockedTest { assertTrue(streamProcessor.isSharing(ROOM_ID)); //verify startBroadcast has been invoked - verify(streamProcessor).startBroadcast(any(), any(), any()); + verify(streamProcessor).startBroadcast(any(), any(), any(), any()); // Assert that there is still just 1 stream and has only the activities to Record assigned assertEquals(1, c.getStreams().size()); diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java index 4dfedcc..e4f38f0 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java @@ -313,7 +313,7 @@ public class ClientManager implements IClientManager { .map(id -> onlineRooms.getOrDefault(id, Set.of())) .stream() .flatMap(Set::stream) - .map(uid -> get(uid)) + .map(this::get) .filter(Objects::nonNull); } @@ -322,10 +322,8 @@ public class ClientManager implements IClientManager { .map(id -> onlineRooms.getOrDefault(id, Set.of())) .stream() .flatMap(Set::stream) - .map(uid -> get(uid)) - .filter(c -> c != null && c.sameUserId(userId)) - .findAny() - .isPresent(); + .map(this::get) + .anyMatch(c -> c != null && c.sameUserId(userId)); } private List<Client> getByKeys(Long userId, String sessionId) { diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java index 799b5a1..2a2800b 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java @@ -82,7 +82,7 @@ public class TimerService { sipCheckMap.put( roomId , new CompletableFuture<>().completeAsync(() -> { - log.warn("Sip room check {}", roomId); + log.trace("Sip room check {}", roomId); Optional<Client> sipClient = cm.streamByRoom(roomId).filter(Client::isSip).findAny(); cm.streamByRoom(roomId).filter(Predicate.not(Client::isSip)).findAny().ifPresentOrElse(c -> { updateSipLastName(sipClient, c.getRoom()); diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java index 4a4ae17..e0f9780 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java @@ -619,8 +619,7 @@ public class RoomPanel extends BasePanel { handler.appendJavaScript("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(true);}"); } else if (streamProcessor.recordingAllowed(getClient())) { boolean hasStreams = cm.streamByRoom(r.getId()) - .filter(cl -> !cl.getStreams().isEmpty()) - .findAny().isPresent(); + .anyMatch(cl -> !cl.getStreams().isEmpty()); handler.appendJavaScript(String.format("if (typeof(WbArea) === 'object') {WbArea.setRecStarted(false);WbArea.setRecEnabled(%s);}", hasStreams)); } } @@ -636,8 +635,7 @@ public class RoomPanel extends BasePanel { public static boolean hasRight(ClientManager cm, long userId, long roomId, Right r) { return cm.streamByRoom(roomId) - .filter(c -> c.sameUserId(userId) && c.hasRight(r)) - .findAny().isPresent(); + .anyMatch(c -> c.sameUserId(userId) && c.hasRight(r)); } @Override diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js index db083a4..215b114 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js @@ -6,6 +6,10 @@ var Video = (function() { , lm, level, userSpeaks = false, muteOthers , hasVideo, isSharing, isRecording; + function __getVideo(_state) { + const vid = self.video(_state); + return vid && vid.length > 0 ? vid[0] : null; + } function _resizeDlgArea(_w, _h) { if (Room.getOptions().interview) { VideoUtil.setPos(v, VideoUtil.getPos()); @@ -153,7 +157,7 @@ var Video = (function() { , onicecandidate: self.onIceCandidate }; if (!isSharing) { - state.options.localVideo = state.video[0]; + state.options.localVideo = __getVideo(state); } const data = state.data; data.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly( @@ -203,7 +207,7 @@ var Video = (function() { function _createResvPeer(msg, state) { __createVideo(state); const options = VideoUtil.addIceServers({ - remoteVideo : state.video[0] + remoteVideo : __getVideo(state) , onicecandidate : self.onIceCandidate }, msg); const data = state.data; @@ -418,8 +422,9 @@ var Video = (function() { } state.video = $(hasVideo ? '<video>' : '<audio>').attr('id', 'vid' + _id) .attr('playsinline', 'playsinline') - .width(vc.width()).height(vc.height()) - .prop('autoplay', true).prop('controls', false); + //.attr('autoplay', 'autoplay') + .prop('controls', false) + .width(vc.width()).height(vc.height()); if (state.data) { state.video.data(state.data); } @@ -568,20 +573,22 @@ var Video = (function() { return; } state.data.rtcPeer.processAnswer(answer, function (error) { - if (true === this.cleaned || this.peerConnection.signalingState === 'stable') { + if (true === this.cleaned) { return; } - if (error) { - return OmUtil.error(error); - } - if (state.video && state.video.paused) { - state.video.play().catch(function (err) { + const video = __getVideo(state); + if (this.peerConnection.signalingState === 'stable' && video && video.paused) { + video.play().catch(function (err) { if ('NotAllowedError' === err.name) { VideoUtil.askPermission(function () { - state.video.play(); + video.play(); }); } }); + return; + } + if (error) { + return OmUtil.error(error); } }); } @@ -625,8 +632,8 @@ var Video = (function() { }); }; self.reattachStream = _reattachStream; - self.video = function() { - const state = states.length > 0 ? states[0] : null; + self.video = function(_state) { + const state = _state || (states.length > 0 ? states[0] : null); if (!state || state.disposed) { return null; } diff --git a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java index 415d78c..985d45b 100644 --- a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java +++ b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java @@ -33,7 +33,7 @@ import org.apache.openmeetings.AbstractJUnitDefaults; import org.apache.openmeetings.db.dao.label.LabelDao; import org.junit.jupiter.api.Test; -public class TestApplication extends AbstractJUnitDefaults { +class TestApplication extends AbstractJUnitDefaults { @Test void testMissing() { assertEquals("[Missing]", app.getOmString("909", Locale.ENGLISH));