Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver

2017-09-15 Thread 'Simon Jacobs' via Akka User List

Hey Johan,

thanks for your reply! 

Calling .get on a CompletableFuture blocks the thread until the future is 
completed, don't do that. 

That's the point. I need to call .get because otherwise the actor stops 
working. I also tried to collect the stages and wait then for the Response:


CompletableFuture.allOf(stages.toArray(new CompletableFuture
[stages.size()])).toCompletableFuture().get()


But also this results only in TimeoutExceptions (no matter what timeout is 
given). The actors simply do nothing if I do not call .get. And that is the 
point, that I do not understand.

Best regards 
Simon


Am Dienstag, 12. September 2017 15:33:35 UTC+2 schrieb Akka Team:
>
> Calling .get on a CompletableFuture blocks the thread until the future is 
> completed, don't do that. User.find.findPagedList also looks suspiciously 
> like something blocking.
> See this section of the docs for more info: 
> http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher
>
> --
> Johan
> Akka Team
>
>
> On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List <
> akka...@googlegroups.com > wrote:
>
>> Hy there,
>>
>> I have an akka scheduler to reindex some data in the search-engine and to 
>> resize the images of every user.
>>
>> It will be called directly by
>>
>> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), 
>> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null);
>>
>> The (flatted to one method for better readability) actor:
>>  
>>@Override
>> public void onReceive(Object msg) throws Exception
>> {
>> long start = System.currentTimeMillis();
>> logger.info("Start reindexing of all User");
>> 
>> if (indexing) {
>> logger.info("Already reindexing. Skip");
>> return;
>> }
>> 
>> try {
>> int page = 0;
>> PagedList users;
>> 
>> do {
>> users = User.find.findPagedList(page, COUNT_OF_ROWS);
>> List active = users.getList().stream().filter(g 
>> -> g.isActive()).collect(Collectors.toList());
>> 
>> List> stages = new 
>> ArrayList>(COUNT_OF_ROWS);
>> for (User user : active) {
>> ActorRef userActor = system.actorOf(Props.create(
>> DependencyInjector.class, injector, UpdateRedisActor.class));
>> userActor.tell(new UpdateRedisActor.Index(user.
>> getId()), null);
>> 
>> if (user.hasProfilePicture()) {
>> /**
>>  * get the image as FilePart (imitate http 
>> upload)
>>  */
>> File image = new File(user.getProfilePicture
>> ().getFileName());
>> FilePart filePart = new FilePart<
>> Object>("", user.getFirstname(), "image/jpg", image);
>> 
>> String randomFileName = UUID.randomUUID().
>> toString();
>> 
>> /**
>>  * Create new actor
>>  */
>> ActorRef imgActor = system.actorOf(Props.
>> create(DependencyInjector.class, injector, ImageActor.class));
>> 
>> /**
>>  * And ask it
>>  */
>> CompletionStage stage = 
>> FutureConverters
>> .toJava(ask(imgActor, new ImageActor.
>> Request(filePart, randomFileName, imageResizeOptionFactory.getValues()), 
>> ImageBroker.TEN_SECONDS * 20))
>> .thenApply((response) -> ((ImageActor
>> .Response) response))
>> .exceptionally((e) -> {
>> Logger.error("Error while 
>> creating picture [" + randomFileName + "]", e);
>> return new ImageActor.Response(
>> 500);
>> });
>> 
>> /**
>>  * Collect stages
>>  */
>> stages.add(stage.toCompletableFuture().
>> thenApplyAsync(response -> {
>> user.updateAndSetProfileImage(response.
>> fileName, response.filePath);
>> user.save();
>> Logger.info("User " + user.getId() + " 
>> saved with new image");
>> return response;
>> }));
>> }
>> }
>> 

[akka-user] Akka Stream Kafka - Avro Desereilization

2017-09-15 Thread Yaser Arshad

Hi,
Using Kafka Source in Akka Streams Kafka, I am trying to deserialize an 
Avro object using KafkaAvroDeserializer that is created using avro-tools 
library.

If I use confluent Kafka Consumer, I can deserialize it like this:

def consumerProperties() = {
  val props = new Properties
  props.put("bootstrap.servers", kafkaServer)
  props.put("group.id", UUID.randomUUID().toString)
  props.put("auto.commit.enable", "false")
  props.put("auto.offset.reset", "earliest")
  props.put("schema.registry.url", schemaUrl)
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", 
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
  props
}


def consume(): Unit ={

  val consumer = new KafkaConsumer[String, Person](consumerProperties)
 consumer.subscribe(Collections.singletonList(topic))

  while(true){

val records : ConsumerRecords[String, Person] =consumer.poll(100)
records.forEach(i => println(i.value().getName))
}
}



But I am unable to deserialize using Kafka source in Akka Streams Kafka.

The consumer settings is of type ConsumerSettings[String, AnyRef], and I think 
I need to somehow change it to ConsumerSettings[String, Person]



val consumerSettings  = ConsumerSettings(system,new StringDeserializer, new 
KafkaAvroDeserializer)
  .withBootstrapServers(kafkaServer)
  .withGroupId(UUID.randomUUID().toString)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withProperty("schema.registry.url", schemaUrl)
  .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
  .withProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
  .withProperty("value.deserializer", 
"io.confluent.kafka.serializers.KafkaAvroDeserializer")

val  subscriptions = Subscriptions.topics(topic)
val output  = Consumer.plainSource(consumerSettings,subscriptions)
output



 


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Removing programmatically and dynamically Node from Cluster

2017-09-15 Thread Sebastian Oliveri
Justin, 

I wrote your implementation but as a local actor in every node to live in 
memory as long as the instance is up and running.

I thought about possible border cases regarding to model it as a local 
actor but I can not come up with a case that would break the scenario.

I am thinking about starting with 3 instances (always odd numbers) and in 
case of a network partition that leaves the cluster with a size of two a 
second network partition would not be fixed by the split brain resolver and 
in this case one of those nodes should be Down using the HTTP management.

Do you know any border case that would break when using local actors as 
split brain resolvers?

Regards,
Sebastian

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Removing programmatically and dynamically Node from Cluster

2017-09-15 Thread Justin du coeur
On Fri, Sep 15, 2017 at 9:48 AM, Sebastian Oliveri 
wrote:

> I wrote your implementation but as a local actor in every node to live in
> memory as long as the instance is up and running.
>
> I thought about possible border cases regarding to model it as a local
> actor but I can not come up with a case that would break the scenario.
>
> I am thinking about starting with 3 instances (always odd numbers) and in
> case of a network partition that leaves the cluster with a size of two a
> second network partition would not be fixed by the split brain resolver and
> in this case one of those nodes should be Down using the HTTP management.
>
> Do you know any border case that would break when using local actors as
> split brain resolvers?
>

No, but keep in mind I'm not a deep expert here -- I understand the problem
well enough, but I'm not remotely as well-versed in the solutions as the
Akka Team themselves.

My system is actually using local actors in much the same way you describe
-- there's a cluster-singleton Coordinator, but a single local Manager per
node, and the Manager does the SBR work.  Indeed, I *think* it's strictly
necessary to have a single local actor per-node for any good solution.
 (Although, again, I'm not an expert.)

In the long run, I have some airy theories about using AWS to make the
architecture more robust (since AWS theoretically has a "god's-eye" view of
which nodes are up), but I haven't spent enough time on that yet to figure
out whether it really works as a solution.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Serialize a throwable for remote actors

2017-09-15 Thread Kevin Osborn
I am using clustered persistent actors. So, my actors are going to be 
sending a response back to the sender. Normally, this is just going to be a 
CommandSuccess message, which is just a simple ack object. And since this 
is remote, I want to use protobuf. I am using ScalaPb. So, in this case, it 
is easy. My CommandSuccess object is serialized back to the sender and my 
Java Serialization message goes away.

But what about Failures. I am a simple failure ( case class 
ObjectNotActiveException() extends Exception. So, normally, I would do

sender ! Failure(ObjectNotActiveException())

But of course, this is not serialized. What is the recommended best 
practice for failures (not exceptions). I could just create another 
ordinary message and handle both cases in the success case of my sender. 
But in my opinion, it is better to handle this in the Future.onFailure.


What is the recommended best practice here? Just live with Java 
Serialization warnings in this case?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka-http WebSockets with separate Sink and Source

2017-09-15 Thread Jakub Janeček
Hello,

I am trying to implement a simple WebSockets server using akka-http with 
separate Sink and Source. According to the documentation I should be using 
the handleMessagesWithSinkSource method however if I do so I am not able to 
receive any Message. I am able to send some back to the client but nothing 
sent to the server is received. Could someone help me out and spot my 
mistake? I guess it will be something obvious.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.IncomingConnection
import akka.http.scaladsl.model.HttpMethods.GET
import akka.http.scaladsl.model.ws.{Message, TextMessage, 
UpgradeToWebSocket}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContextExecutor, Promise}

object Main extends StrictLogging {

  def main(args: Array[String]): Unit = {
for {
  config <- asResource(ConfigFactory.load())
  system <- asResource(ActorSystem("system", config), Duration("10s"))
  materializer <- asResource(ActorMaterializer()(system))
} {
  implicit val s: ActorSystem = system
  implicit val m: ActorMaterializer = materializer
  implicit val ec: ExecutionContextExecutor = system.dispatcher

  val listenAddress = config.getString("listenAddress")
  val listenPort = config.getInt("listenPort")
  val server = Http().bind(listenAddress, listenPort)

  val requestHandler: HttpRequest => HttpResponse = {
case req @ HttpRequest(GET, Uri.Path("/websocket"), _, _, _) =>
  req.header[UpgradeToWebSocket] match {
case Some(upgrade) =>
  logger.debug(s"Upgrading: $req")
  val inHandler = Sink.foreach[Message](x => 
logger.debug(s"$x"))
  upgrade.handleMessagesWithSinkSource(inHandler, 
Source.single(TextMessage("hello")))

case None => HttpResponse(400, entity = "Expected 'Upgrade: 
websocket' HTTP header")
  }

case req: HttpRequest =>
  logger.debug(s"Unexpected: $req")
  req.discardEntityBytes() // important to drain incoming HTTP 
Entity stream
  HttpResponse(404, entity = "URL match not found")
  }

  val connectionHandler = Sink.foreach { connection: IncomingConnection 
=>
logger.debug(s"Connection accepted from 
${connection.remoteAddress}")
connection.handleWithSyncHandler(requestHandler)
  }

  val binding = server.to(connectionHandler).run()
  binding.failed.foreach { ex =>
logger.error(s"Server could not be bound to 
$listenAddress:$listenPort", ex)
  }

  StdIn.readLine()
}
  }

}


Thank you. Regards,
Jakub Janecek

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-http WebSockets with separate Sink and Source

2017-09-15 Thread Konrad “ktoso” Malawski
You seem to be logging on debug level there - are you sure your logging
configuration is such that it will log/print log statements?

On September 16, 2017 at 5:18:19, Jakub Janeček (janecek.ja...@gmail.com)
wrote:

> Hello,
>
> I am trying to implement a simple WebSockets server using akka-http with
> separate Sink and Source. According to the documentation I should be using
> the handleMessagesWithSinkSource method however if I do so I am not able to
> receive any Message. I am able to send some back to the client but nothing
> sent to the server is received. Could someone help me out and spot my
> mistake? I guess it will be something obvious.
>
> import akka.actor.ActorSystem
> import akka.http.scaladsl.Http
> import akka.http.scaladsl.Http.IncomingConnection
> import akka.http.scaladsl.model.HttpMethods.GET
> import akka.http.scaladsl.model.ws.{Message, TextMessage,
> UpgradeToWebSocket}
> import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
> import akka.stream.ActorMaterializer
> import akka.stream.scaladsl.{Sink, Source}
> import com.typesafe.config.ConfigFactory
> import com.typesafe.scalalogging.StrictLogging
>
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, ExecutionContextExecutor, Promise}
>
> object Main extends StrictLogging {
>
>   def main(args: Array[String]): Unit = {
> for {
>   config <- asResource(ConfigFactory.load())
>   system <- asResource(ActorSystem("system", config), Duration("10s"))
>   materializer <- asResource(ActorMaterializer()(system))
> } {
>   implicit val s: ActorSystem = system
>   implicit val m: ActorMaterializer = materializer
>   implicit val ec: ExecutionContextExecutor = system.dispatcher
>
>   val listenAddress = config.getString("listenAddress")
>   val listenPort = config.getInt("listenPort")
>   val server = Http().bind(listenAddress, listenPort)
>
>   val requestHandler: HttpRequest => HttpResponse = {
> case req @ HttpRequest(GET, Uri.Path("/websocket"), _, _, _) =>
>   req.header[UpgradeToWebSocket] match {
> case Some(upgrade) =>
>   logger.debug(s"Upgrading: $req")
>   val inHandler = Sink.foreach[Message](x =>
> logger.debug(s"$x"))
>   upgrade.handleMessagesWithSinkSource(inHandler,
> Source.single(TextMessage("hello")))
>
> case None => HttpResponse(400, entity = "Expected 'Upgrade:
> websocket' HTTP header")
>   }
>
> case req: HttpRequest =>
>   logger.debug(s"Unexpected: $req")
>   req.discardEntityBytes() // important to drain incoming HTTP
> Entity stream
>   HttpResponse(404, entity = "URL match not found")
>   }
>
>   val connectionHandler = Sink.foreach { connection:
> IncomingConnection =>
> logger.debug(s"Connection accepted from
> ${connection.remoteAddress}")
> connection.handleWithSyncHandler(requestHandler)
>   }
>
>   val binding = server.to(connectionHandler).run()
>   binding.failed.foreach { ex =>
> logger.error(s"Server could not be bound to
> $listenAddress:$listenPort", ex)
>   }
>
>   StdIn.readLine()
> }
>   }
>
> }
>
>
> Thank you. Regards,
> Jakub Janecek
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-http WebSockets with separate Sink and Source

2017-09-15 Thread Jakub Janeček
Yes, I am sure. This is the only output from the console I get (I even 
tried with println just to be sure :) ):

[DEBUG] [09/16/2017 07:43:11.335] [main] [EventStream(akka://system)] 
logger log1-Logging$DefaultLogger started
[DEBUG] [09/16/2017 07:43:11.336] [main] [EventStream(akka://system)] 
Default Loggers started
[DEBUG] [09/16/2017 07:43:11.411] [main] [AkkaSSLConfig(akka://system)] 
Initializing AkkaSSLConfig extension...
[DEBUG] [09/16/2017 07:43:11.414] [main] [AkkaSSLConfig(akka://system)] 
buildHostnameVerifier: created hostname verifier: 
com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@72c8e7b
[DEBUG] [09/16/2017 07:43:12.270] [system-akka.actor.default-dispatcher-2] 
[akka://system/system/IO-TCP/selectors/$a/0] Successfully bound to 
/0:0:0:0:0:0:0:0:8080
[DEBUG] [09/16/2017 07:43:14.289] [system-akka.actor.default-dispatcher-2] 
[akka://system/system/IO-TCP/selectors/$a/0] New connection accepted
2017-09-16 07:43:14.308 DEBUG [default-dispatcher-3]  
 system.Main$: Connection accepted from /0:0:0:0:0:0:0:1:57557
2017-09-16 07:43:14.434 DEBUG [default-dispatcher-4]  
 system.Main$: Upgrading: 
HttpRequest(HttpMethod(GET),ws://localhost:8080/websocket,List(UpgradeToWebSocket:
 
, Host: localhost:8080, Connection: Upgrade, Pragma: no-cache, 
Cache-Control: no-cache, Upgrade: websocket, Origin: 
chrome-extension://pfdhoblngboilpfeibdedpjgfnlcodoo, Sec-WebSocket-Version: 
13, User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.83 Safari/537.36 
Vivaldi/1.93.955.14, DNT: 1, Accept-Encoding: gzip, deflate, br, 
Accept-Language: en-GB, en-US;q=0.8, en;q=0.6, Sec-WebSocket-Key: 
3ug15ARDswT6DEfqhw1wdQ==, Sec-WebSocket-Extensions: permessage-deflate; 
client_max_window_bits, Timeout-Access: 
),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1))

I am using Simple Web Socket Client Chrome extension and it is able to 
connect to the server, it is able to receive the "hello" message from my 
Source but anything I send to the server is not processed and there is even 
no logging from Akka or anything. When I tried the other approach from 
documentation (with combined Sink and Source into Flow) it worked well. 
However I know for sure I will need the separate Sink and Source so that's 
why I need to make this work. Should I share this on GitHub so that you can 
try this out?



Thank you. Regards,
Jakub Janecek


On Saturday, September 16, 2017 at 5:56:14 AM UTC+2, Konrad Malawski wrote:
>
> You seem to be logging on debug level there - are you sure your logging 
> configuration is such that it will log/print log statements? 
>
> On September 16, 2017 at 5:18:19, Jakub Janeček (janece...@gmail.com 
> ) wrote:
>
>> Hello, 
>>
>> I am trying to implement a simple WebSockets server using akka-http with 
>> separate Sink and Source. According to the documentation I should be using 
>> the handleMessagesWithSinkSource method however if I do so I am not able to 
>> receive any Message. I am able to send some back to the client but nothing 
>> sent to the server is received. Could someone help me out and spot my 
>> mistake? I guess it will be something obvious.
>>
>> import akka.actor.ActorSystem
>> import akka.http.scaladsl.Http
>> import akka.http.scaladsl.Http.IncomingConnection
>> import akka.http.scaladsl.model.HttpMethods.GET
>> import akka.http.scaladsl.model.ws.{Message, TextMessage, 
>> UpgradeToWebSocket}
>> import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
>> import akka.stream.ActorMaterializer
>> import akka.stream.scaladsl.{Sink, Source}
>> import com.typesafe.config.ConfigFactory
>> import com.typesafe.scalalogging.StrictLogging
>>
>> import scala.concurrent.duration.Duration
>> import scala.concurrent.{Await, ExecutionContextExecutor, Promise}
>>
>> object Main extends StrictLogging {
>>
>>   def main(args: Array[String]): Unit = {
>> for {
>>   config <- asResource(ConfigFactory.load())
>>   system <- asResource(ActorSystem("system", config), Duration("10s"))
>>   materializer <- asResource(ActorMaterializer()(system))
>> } {
>>   implicit val s: ActorSystem = system
>>   implicit val m: ActorMaterializer = materializer
>>   implicit val ec: ExecutionContextExecutor = system.dispatcher
>>
>>   val listenAddress = config.getString("listenAddress")
>>   val listenPort = config.getInt("listenPort")
>>   val server = Http().bind(listenAddress, listenPort)
>>
>>   val requestHandler: HttpRequest => HttpResponse = {
>> case req @ HttpRequest(GET, Uri.Path("/websocket"), _, _, _) =>
>>   req.header[UpgradeToWebSocket] match {
>> case Some(upgrade) =>
>>   logger.debug(s"Upgrading: $req")
>>   val inHandler = Sink.foreach[Message](x => 
>> logger.debug(s"$x"))