Hello Federico

 

Below my code.


The class WebSocketApp is the Http App. The Method metrics returns the 
required Flow. This flow forwards all incoming messages to a newly created 
Actor of type WebSocketCommandSubscriber. The flow gets the messages to 
send back to the websocket client from newly created Actor of Type 
WebSocketDataPublisherActor. The Actor WebSocketDataPublisherActor receives 
the sending events from the Eventstream. If the websocket-stream is not 
ready to receive data (keyword backpressure) incoming messages from the 
Eventstream are lost and never send to the websocket client. For 
each connection this two Actors are created once.

 

With this solution it is NOT possible to implement a request/response 
handling, because the WebSocketDataPublisherActor is listening on the 
Eventstream. I have solved this problem too and I will publish the code to my 
own question <https://groups.google.com/forum/#!topic/akka-user/CkbaYINPbkU> 
later.

 

Have fun!

Flavio


Ps. A starting point to understand steams could be: 
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

 

 

public class WebSocketApp extends HttpApp {

 

  private static final Gson gson = new Gson();

 

  @Override

  public Route createRoute() {

    return get(

        path("metrics").route(handleWebSocketMessages(metrics()))

        );

  }

 

  private Flow<Message, Message, ?> metrics() {

    Sink<Message, ActorRef> metricsSink = 
Sink.actorSubscriber(WebSocketCommandSubscriber.props());

    Source<Message, ActorRef> metricsSource = 

        Source.actorPublisher(WebSocketDataPublisherActor.props())

        .map((measurementData) -> 
TextMessage.create(gson.toJson(measurementData)));

    return Flow.fromSinkAndSource(metricsSink, metricsSource);

  }

}

 


public class WebSocketCommandSubscriber extends AbstractActorSubscriber {

 

    public static Props props() {

        return Props.create(WebSocketCommandSubscriber.class);

    }

   

    public WebSocketCommandSubscriber() {

        receive(ReceiveBuilder.

            match(ActorSubscriberMessage.OnNext.class, on -> on.element() 
instanceof Message,

                    onNext -> {

                        Message message = (Message)onNext.element();

                        handleIncomingMessage(message);

           

          // TODO: how do we handle OnComplete? (do we have to / why 
exception)

//        })

//        .match(akka.stream.actor.ActorSubscriberMessage.OnComplete., (x) 
-> {

//            context().system().stop(self());

        }).match(Object.class, (x) -> {

            System.out.println("WebSocketCommandSubscriber: Unkown 
incomming message: " + x.getClass().getName() + ": "  + x);

            unhandled(x);

        }).build());

    }

   

    private void handleIncomingMessage(Message message) {

        WebSocketMessage<?> wsCommand = new 
Gson().fromJson(message.asTextMessage().getStrictText(), 
WebSocketMessage.class);

        switch (wsCommand.type) {

        case "DoX":

            
getConnectionsActor().tell(XyActor.doX(createRemoteAddress(wsCommand)), 
ActorRef.noSender());

            break;

        case "DoY":

            
getConnectionsActor().tell(XyActor.doY(createRemoteAddress(wsCommand)), 
ActorRef.noSender());

            break;

        }

    }

    // TODO: we should refactor our messaging concept - it does not work 
that nice as expected

    private RemoteAddress createRemoteAddress(WebSocketMessage<?> 
wsCommand) {

        @SuppressWarnings("rawtypes")

        LinkedTreeMap map = (LinkedTreeMap) wsCommand.data;

        return new RemoteAddress(map.get("host").toString(), 
Double.valueOf(map.get("port").toString()).intValue(), 
map.get("actorSystemName").toString());

    }

    private NodeName createNodeName(WebSocketMessage<?> wsCommand) {

        @SuppressWarnings("rawtypes")

        LinkedTreeMap map = (LinkedTreeMap) wsCommand.data;

        return new NodeName(map.get("nodeName").toString());

    }

   

    private ActorSelection getConnectionsActor() {

        return context().system().actorSelection(Connections.ACTOR_PATH);

    }

    private ActorSelection getConnectionActor(WebSocketMessage<?> 
wsCommand) {

        return 
context().system().actorSelection(NodeConnection.getActorPath(createNodeName(wsCommand)));

    }

 

    @Override

    public RequestStrategy requestStrategy() {

        return new MaxInFlightRequestStrategy(10) {

            @Override

            public int inFlightInternally() {

                // we do not hold any messages yet, but will eventually be

                // required, e.g. for request/response message handling

                return 0;

            }

        };

    }

}

 

 

public class WebSocketDataPublisher extends 
AbstractActorPublisher<WebSocketMessage<?>> {

 

    // TODO: the WebSocket client should be able to configure its 
interessted events, by sending a corresponding message

    List<Class<?>> interesstedEvents = Arrays.asList(

            MeasurementDataMessage.class,

            NodeConnectionEvents.class);

 

    public static Props props() {

        return Props.create(WebSocketDataPublisher.class);

    }

 

    @Override

    public void preStart() throws Exception {

        for (Class<?> eventClass : interesstedEvents) {

            // unsubscribing performed automatically by the event stream on 
actor destroy

            getContext().system().eventStream().subscribe(self(), 
eventClass);

        }

    }

 

    public WebSocketDataPublisher() {

        UnitPFBuilder<Object> builder = ReceiveBuilder.match(Cancel.class, 
cancel -> context().stop(self()));

        for (Class<?> clazz : interesstedEvents) {

            builder = builder.match(clazz, message -> {

                handleMessage(message);

            });

        }

        receive(builder.build());

    }

 

    private void handleMessage(Object message) {

        // while the stream is not ready to receive data - incoming 
messages are lost

        if (isActive() && totalDemand() > 0) {

            WebSocketMessage<?> webSocketMessage = 
WebSocketMessage.create(message.getClass().getSimpleName(), message);

//            System.out.println("send message to WS: " + message);

            onNext(webSocketMessage);

        } else {

//            System.out.println("LOST message to WS: " + message);

        }

    }

}

 


public class WebSocketMessage<T> {

    public final String type;

    public final T data;

 

    public static WebSocketMessage<Void> create(String type) {

        return create(type, null);

    }

 

    public static <T> WebSocketMessage<T> create(String type, T data) {

        return new WebSocketMessage<>(type, data);

    }

    

    private WebSocketMessage(String type, T data) {

        this.type = type;

        this.data = data;

    }

}

 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to