This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new a73330b  OPENMEETINGS-2315 Only StreamProcessor to hold reference of 
KStream (#71)
a73330b is described below

commit a73330b84ad31ec2bab0d40168446c4e5b449eaa
Author: Sebastian Wagner <sebawag...@apache.org>
AuthorDate: Fri May 1 13:18:16 2020 +1200

    OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream (#71)
    
    * OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream
    
    * OPENMEETINGS-2315 Remove uncommented code to add stream that won't be 
needed anymore
    
    * OPENMEETINGS-2315 Fix stopBroadcast to release processor. And move to 
StreamProcessor.
    
    * OPENMEETINGS-2315 Rename KRomm.streamProcessor to KRoom.processor.
    
    * OPENMEETINGS-2315 Fix review comments on method modifiers and formatting.
    
    * OPENMEETINGS-2315 Fix circular reference.
    
    * OPENMEETINGS-2315 Fix review comments
    
    * OPENMEETINGS-2315 fix some linebreak
    
    * OPENMEETINGS-2315 fix some linebreak2
    
    * OPENMEETINGS-2315 change list init.
---
 .../openmeetings/core/remote/IStreamProcessor.java |  2 +-
 .../org/apache/openmeetings/core/remote/KRoom.java | 62 +++++++++-------------
 .../apache/openmeetings/core/remote/KStream.java   |  9 ++--
 .../openmeetings/core/remote/KTestStream.java      |  2 +-
 .../openmeetings/core/remote/KurentoHandler.java   |  6 +--
 .../openmeetings/core/remote/StreamProcessor.java  | 30 +++++++----
 .../core/remote/TestStreamProcessor.java           |  2 +-
 .../web/admin/connection/ConnectionsPanel.java     |  8 ---
 8 files changed, 55 insertions(+), 66 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
index d6575ed..f142285 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
@@ -19,6 +19,6 @@
 package org.apache.openmeetings.core.remote;
 
 public interface IStreamProcessor {
-       void release(AbstractStream stream);
+       void release(AbstractStream stream, boolean releaseStream);
        void destroy();
 }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 47abfdd..f248a61 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -27,9 +27,7 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 
 import java.util.Collection;
 import java.util.Date;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.openmeetings.core.util.WebSocketHelper;
@@ -53,10 +51,18 @@ import org.slf4j.LoggerFactory;
 
 import com.github.openjson.JSONObject;
 
+/**
+ * Bean object dynamically created representing a conference room on the 
MediaServer
+ *
+ */
 public class KRoom {
+
        private static final Logger log = LoggerFactory.getLogger(KRoom.class);
 
-       private final Map<String, KStream> streams = new ConcurrentHashMap<>();
+       /**
+        * Not injected by annotation but by constructor.
+        */
+       private final StreamProcessor processor;
        private final MediaPipeline pipeline;
        private final Long roomId;
        private final Room.Type type;
@@ -67,7 +73,8 @@ public class KRoom {
        private JSONObject recordingUser = new JSONObject();
        private JSONObject sharingUser = new JSONObject();
 
-       public KRoom(Room r, MediaPipeline pipeline, RecordingChunkDao 
chunkDao) {
+       public KRoom(StreamProcessor processor, Room r, MediaPipeline pipeline, 
RecordingChunkDao chunkDao) {
+               this.processor = processor;
                this.roomId = r.getId();
                this.type = r.getType();
                this.pipeline = pipeline;
@@ -98,17 +105,16 @@ public class KRoom {
        public KStream join(final StreamDesc sd) {
                log.info("ROOM {}: join client {}, stream: {}", roomId, 
sd.getClient(), sd.getUid());
                final KStream stream = new KStream(sd, this);
-               streams.put(stream.getUid(), stream);
+               processor.addStream(stream);
                return stream;
        }
 
        public Collection<KStream> getParticipants() {
-               return streams.values();
+               return processor.getByRoom(this.getRoomId());
        }
 
-       public void onStopBroadcast(KStream stream, final StreamProcessor 
processor) {
-               streams.remove(stream.getUid());
-               stream.release(processor);
+       public void onStopBroadcast(KStream stream) {
+               processor.release(stream, true);
                WebSocketHelper.sendAll(newKurentoMsg()
                                .put("id", "broadcastStopped")
                                .put("uid", stream.getUid())
@@ -118,21 +124,6 @@ public class KRoom {
                //FIXME TODO permission can be removed, some listener might be 
required
        }
 
-       public void leave(final StreamProcessor processor, final Client c) {
-               for (Map.Entry<String, KStream> e : streams.entrySet()) {
-                       e.getValue().remove(c);
-               }
-               for (StreamDesc sd : c.getStreams()) {
-                       if (StreamType.SCREEN == sd.getType()) {
-
-                       }
-                       KStream stream = streams.remove(sd.getUid());
-                       if (stream != null) {
-                               stream.release(processor);
-                       }
-               }
-       }
-
        public boolean isRecording() {
                return recordingStarted.get();
        }
@@ -141,7 +132,7 @@ public class KRoom {
                return new JSONObject(recordingUser.toString());
        }
 
-       public void startRecording(StreamProcessor processor, Client c) {
+       public void startRecording(Client c) {
                if (recordingStarted.compareAndSet(false, true)) {
                        log.debug("##REC:: recording in room {} is starting 
::", roomId);
                        Room r = c.getRoom();
@@ -180,9 +171,9 @@ public class KRoom {
                        rec = processor.getRecordingDao().update(rec);
                        // Receive recordingId
                        recordingId = rec.getId();
-                       for (final KStream stream : streams.values()) {
-                               stream.startRecord(processor);
-                       }
+                       processor.getByRoom(this.getRoomId()).forEach(
+                                       stream -> stream.startRecord(processor)
+                       );
 
                        // Send notification to all users that the recording 
has been started
                        WebSocketHelper.sendRoom(new RoomMessage(roomId, u, 
RoomMessage.Type.RECORDING_TOGGLED));
@@ -190,12 +181,10 @@ public class KRoom {
                }
        }
 
-       public void stopRecording(final StreamProcessor processor, Client c) {
+       public void stopRecording(Client c) {
                if (recordingStarted.compareAndSet(true, false)) {
                        log.debug("##REC:: recording in room {} is stopping {} 
::", roomId, recordingId);
-                       for (final KStream stream : streams.values()) {
-                               stream.stopRecord();
-                       }
+                       
processor.getByRoom(this.getRoomId()).forEach(KStream::stopRecord);
                        Recording rec = 
processor.getRecordingDao().get(recordingId);
                        rec.setRecordEnd(new Date());
                        rec = processor.getRecordingDao().update(rec);
@@ -270,11 +259,10 @@ public class KRoom {
                }
        }
 
-       public void close(final StreamProcessor processor) {
-               for (final KStream stream : streams.values()) {
-                       stream.release(processor);
-               }
-               streams.clear();
+       public void close() {
+               processor.getByRoom(this.getRoomId()).forEach(
+                               stream -> stream.release(processor)
+               );
                pipeline.release(new Continuation<Void>() {
                        @Override
                        public void onSuccess(Void result) throws Exception {
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 397d25f..fd4fa65 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -130,7 +130,7 @@ public class KStream extends AbstractStream {
                                                if (StreamType.SCREEN == 
streamType) {
                                                        
processor.doStopSharing(sid, uid);
                                                }
-                                               stopBroadcast(processor);
+                                               stopBroadcast();
                                                return null;
                                        }, delayedExecutor(getFlowoutTimeout(), 
TimeUnit.SECONDS)));
                                        break;
@@ -144,7 +144,6 @@ public class KStream extends AbstractStream {
                        }
                });
                outgoingMedia.addMediaFlowInStateChangeListener(evt -> 
log.warn("Media FlowIn :: {}", evt));
-               processor.addStream(this);
                addListener(processor, sd.getSid(), sd.getUid(), sdpOffer);
                if (room.isRecording()) {
                        startRecord(processor);
@@ -282,8 +281,8 @@ public class KStream extends AbstractStream {
                }
        }
 
-       public void stopBroadcast(final StreamProcessor processor) {
-               room.onStopBroadcast(this, processor);
+       public void stopBroadcast() {
+               room.onStopBroadcast(this);
        }
 
        public void pauseSharing() {
@@ -321,7 +320,7 @@ public class KStream extends AbstractStream {
                }
                releaseRecorder();
                if (remove) {
-                       processor.release(this);
+                       processor.release(this, false);
                }
        }
 
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 7da1a1e..0839693 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -223,6 +223,6 @@ public class KTestStream extends AbstractStream {
                }
                releasePlayer();
                releaseRecorder();
-               processor.release(this);
+               processor.release(this, true);
        }
 }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index ade553e..e7d60405 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -142,7 +142,7 @@ public class KurentoHandler {
                        kuid = randomUUID().toString(); // will be changed to 
prevent double events
                        client.destroy();
                        for (Entry<Long, KRoom> e : rooms.entrySet()) {
-                               e.getValue().close(streamProcessor);
+                               e.getValue().close();
                        }
                        testProcessor.destroy();
                        streamProcessor.destroy();
@@ -238,7 +238,7 @@ public class KurentoHandler {
                        pipe.addTag(t, TAG_KUID, kuid);
                        pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
                        t.commit();
-                       room = new KRoom(r, pipe, chunkDao);
+                       room = new KRoom(streamProcessor, r, pipe, chunkDao);
                        rooms.put(roomId, room);
                }
                log.debug("Room {} found!", roomId);
@@ -426,7 +426,7 @@ public class KurentoHandler {
                                                        return;
                                                } else if (r != null) {
                                                        
rooms.remove(r.getRoomId());
-                                                       
r.close(streamProcessor);
+                                                       r.close();
                                                }
                                        }
                                        log.warn("Invalid MediaPipeline {} 
detected, will be dropped, tags: {}", pipe.getId(), tags);
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 32cda74..caceb36 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -65,6 +65,9 @@ import com.github.openjson.JSONObject;
 @Component
 public class StreamProcessor implements IStreamProcessor {
        private static final Logger log = 
LoggerFactory.getLogger(StreamProcessor.class);
+       /**
+        * Holds a reference to the current streams available on the server 
instance
+        */
        private final Map<String, KStream> streamByUid = new 
ConcurrentHashMap<>();
 
        @Autowired
@@ -214,7 +217,7 @@ public class StreamProcessor implements IStreamProcessor {
                        .forEach(lsd -> {
                                KStream s = getByUid(lsd.getUid());
                                if (s != null) {
-                                       s.stopBroadcast(this);
+                                       s.stopBroadcast();
                                }
                                c.removeStream(lsd.getUid());
                                closed.add(lsd.getUid());
@@ -284,7 +287,7 @@ public class StreamProcessor implements IStreamProcessor {
                                        KStream stream = 
streamByUid.get(sd.getUid());
                                        if (stream != null) {
                                                KRoom room = 
kHandler.getRoom(c.getRoomId());
-                                               room.onStopBroadcast(stream, 
this);
+                                               room.onStopBroadcast(stream);
                                        }
                                });
                }
@@ -305,7 +308,7 @@ public class StreamProcessor implements IStreamProcessor {
                                room.stopSharing();
                                if (Room.Type.INTERVIEW != room.getType() && 
room.isRecording()) {
                                        log.info("No more screen streams in the 
non-interview room, stopping recording");
-                                       room.stopRecording(this, null);
+                                       room.stopRecording(null);
                                }
                        }
                }
@@ -315,7 +318,7 @@ public class StreamProcessor implements IStreamProcessor {
                                        .collect(Collectors.toList());
                        if (streams.isEmpty()) {
                                log.info("No more streams in the room, stopping 
recording");
-                               room.stopRecording(this, null);
+                               room.stopRecording(null);
                        }
                }
        }
@@ -394,7 +397,7 @@ public class StreamProcessor implements IStreamProcessor {
                KStream sender = getByUid(uid);
                StreamDesc sd = doStopSharing(c.getSid(), uid);
                if (sender != null && sd != null) {
-                       sender.stopBroadcast(this);
+                       sender.stopBroadcast();
                } else {
                        log.warn("Could not stop broadcast - could be a KStream 
leak and lead to ghost KStream, client: {}, uid: {} ", c, uid);
                }
@@ -447,14 +450,14 @@ public class StreamProcessor implements IStreamProcessor {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
                }
-               kHandler.getRoom(c.getRoomId()).startRecording(this, c);
+               kHandler.getRoom(c.getRoomId()).startRecording(c);
        }
 
        public void stopRecording(Client c) {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
                }
-               kHandler.getRoom(c.getRoomId()).stopRecording(this, c);
+               kHandler.getRoom(c.getRoomId()).stopRecording(c);
 
                // In case this user wasn't shareing his screen we also need to 
close that one
                c.getScreenStream().ifPresent(sd -> {
@@ -494,8 +497,6 @@ public class StreamProcessor implements IStreamProcessor {
                        }
                }
                if (c.getRoomId() != null) {
-                       KRoom room = kHandler.getRoom(c.getRoomId());
-                       room.leave(this, c);
                        checkStreams(c.getRoomId());
                }
        }
@@ -508,6 +509,12 @@ public class StreamProcessor implements IStreamProcessor {
                return streamByUid.values();
        }
 
+       Collection<KStream> getByRoom(Long roomId) {
+               return streamByUid.values().stream()
+                               .filter(stream -> stream.getRoom() != null && 
stream.getRoom().getRoomId().equals(roomId))
+                               .collect(Collectors.toList());
+       }
+
        Client getBySid(String sid) {
                return cm.getBySid(sid);
        }
@@ -533,8 +540,11 @@ public class StreamProcessor implements IStreamProcessor {
        }
 
        @Override
-       public void release(AbstractStream stream) {
+       public void release(AbstractStream stream, boolean releaseStream) {
                final String uid = stream.getUid();
+               if (releaseStream) {
+                       stream.release(this);
+               }
                Client c = cm.getBySid(stream.getSid());
                if (c != null) {
                        StreamDesc sd = c.getStream(uid);
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
index e3a171d..f3a3dad 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
@@ -114,7 +114,7 @@ public class TestStreamProcessor implements 
IStreamProcessor {
        }
 
        @Override
-       public void release(AbstractStream stream) {
+       public void release(AbstractStream stream, boolean releaseStream) {
                streamByUid.remove(stream.getUid());
        }
 
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
index 84cc14c..270f809 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
@@ -90,14 +90,6 @@ public class ConnectionsPanel extends AdminBasePanel {
                                l.addAll(streams);
                                log.info("Retrieve all Streams, StreamProcessor 
has {} of streams", streams.size());
 
-                               List<KStreamDto> missing = kHandler.getRooms()
-                                               .stream()
-                                               .flatMap(room -> 
room.getParticipants().stream())
-                                               .filter(stream -> 
!streamProcessor.hasStream(stream.getUid()))
-                                               .map(kStream -> new 
KStreamDto("KRoom", kStream))
-                                               .collect(Collectors.toList());
-                               l.addAll(missing);
-                               log.warn("Following streams were in KRoom but 
not in StreamProcessor: {}", missing);
                                return l;
                        }
 

Reply via email to