Hello Federico
Below my code. The class WebSocketApp is the Http App. The Method metrics returns the required Flow. This flow forwards all incoming messages to a newly created Actor of type WebSocketCommandSubscriber. The flow gets the messages to send back to the websocket client from newly created Actor of Type WebSocketDataPublisherActor. The Actor WebSocketDataPublisherActor receives the sending events from the Eventstream. If the websocket-stream is not ready to receive data (keyword backpressure) incoming messages from the Eventstream are lost and never send to the websocket client. For each connection this two Actors are created once. With this solution it is NOT possible to implement a request/response handling, because the WebSocketDataPublisherActor is listening on the Eventstream. I have solved this problem too and I will publish the code to my own question <https://groups.google.com/forum/#!topic/akka-user/CkbaYINPbkU> later. Have fun! Flavio Ps. A starting point to understand steams could be: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 public class WebSocketApp extends HttpApp { private static final Gson gson = new Gson(); @Override public Route createRoute() { return get( path("metrics").route(handleWebSocketMessages(metrics())) ); } private Flow<Message, Message, ?> metrics() { Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props()); Source<Message, ActorRef> metricsSource = Source.actorPublisher(WebSocketDataPublisherActor.props()) .map((measurementData) -> TextMessage.create(gson.toJson(measurementData))); return Flow.fromSinkAndSource(metricsSink, metricsSource); } } public class WebSocketCommandSubscriber extends AbstractActorSubscriber { public static Props props() { return Props.create(WebSocketCommandSubscriber.class); } public WebSocketCommandSubscriber() { receive(ReceiveBuilder. match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof Message, onNext -> { Message message = (Message)onNext.element(); handleIncomingMessage(message); // TODO: how do we handle OnComplete? (do we have to / why exception) // }) // .match(akka.stream.actor.ActorSubscriberMessage.OnComplete., (x) -> { // context().system().stop(self()); }).match(Object.class, (x) -> { System.out.println("WebSocketCommandSubscriber: Unkown incomming message: " + x.getClass().getName() + ": " + x); unhandled(x); }).build()); } private void handleIncomingMessage(Message message) { WebSocketMessage<?> wsCommand = new Gson().fromJson(message.asTextMessage().getStrictText(), WebSocketMessage.class); switch (wsCommand.type) { case "DoX": getConnectionsActor().tell(XyActor.doX(createRemoteAddress(wsCommand)), ActorRef.noSender()); break; case "DoY": getConnectionsActor().tell(XyActor.doY(createRemoteAddress(wsCommand)), ActorRef.noSender()); break; } } // TODO: we should refactor our messaging concept - it does not work that nice as expected private RemoteAddress createRemoteAddress(WebSocketMessage<?> wsCommand) { @SuppressWarnings("rawtypes") LinkedTreeMap map = (LinkedTreeMap) wsCommand.data; return new RemoteAddress(map.get("host").toString(), Double.valueOf(map.get("port").toString()).intValue(), map.get("actorSystemName").toString()); } private NodeName createNodeName(WebSocketMessage<?> wsCommand) { @SuppressWarnings("rawtypes") LinkedTreeMap map = (LinkedTreeMap) wsCommand.data; return new NodeName(map.get("nodeName").toString()); } private ActorSelection getConnectionsActor() { return context().system().actorSelection(Connections.ACTOR_PATH); } private ActorSelection getConnectionActor(WebSocketMessage<?> wsCommand) { return context().system().actorSelection(NodeConnection.getActorPath(createNodeName(wsCommand))); } @Override public RequestStrategy requestStrategy() { return new MaxInFlightRequestStrategy(10) { @Override public int inFlightInternally() { // we do not hold any messages yet, but will eventually be // required, e.g. for request/response message handling return 0; } }; } } public class WebSocketDataPublisher extends AbstractActorPublisher<WebSocketMessage<?>> { // TODO: the WebSocket client should be able to configure its interessted events, by sending a corresponding message List<Class<?>> interesstedEvents = Arrays.asList( MeasurementDataMessage.class, NodeConnectionEvents.class); public static Props props() { return Props.create(WebSocketDataPublisher.class); } @Override public void preStart() throws Exception { for (Class<?> eventClass : interesstedEvents) { // unsubscribing performed automatically by the event stream on actor destroy getContext().system().eventStream().subscribe(self(), eventClass); } } public WebSocketDataPublisher() { UnitPFBuilder<Object> builder = ReceiveBuilder.match(Cancel.class, cancel -> context().stop(self())); for (Class<?> clazz : interesstedEvents) { builder = builder.match(clazz, message -> { handleMessage(message); }); } receive(builder.build()); } private void handleMessage(Object message) { // while the stream is not ready to receive data - incoming messages are lost if (isActive() && totalDemand() > 0) { WebSocketMessage<?> webSocketMessage = WebSocketMessage.create(message.getClass().getSimpleName(), message); // System.out.println("send message to WS: " + message); onNext(webSocketMessage); } else { // System.out.println("LOST message to WS: " + message); } } } public class WebSocketMessage<T> { public final String type; public final T data; public static WebSocketMessage<Void> create(String type) { return create(type, null); } public static <T> WebSocketMessage<T> create(String type, T data) { return new WebSocketMessage<>(type, data); } private WebSocketMessage(String type, T data) { this.type = type; this.data = data; } } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.