Message volume from upstream i.e., Riemann WebSocket end point is 1million 
text messages each under few KB in size. 

On Saturday, June 3, 2017 at 8:59:59 AM UTC-7, Kalyan Kadiyala wrote:
>
> Lib dependency: "com.typesafe.akka" %% "akka-http" % "10.0.7"
>
>
> Issue Background - I am running a Websocket performance test for a 
> WebSocket Proxy - custom implementation using Akka-Http; connected as a 
> client i.e., test for In-bound messages. Websocket end point happens to be 
> Riemann stream processor. Riemann gets events from a future in my client 
> (scala test) via Riemann TCP and the events are read back over Websocket. 
> Mention of Riemann is for context only.
>
>
> Regarding API usage...
>
>
> Socket Request -
> http.singleWebSocketRequest(WebSocketRequest(s
> "ws://${proxyDependency.wsHost}:${proxyDependency.wsPort}/${uriPath}"), 
> clientFlow)
>
>
> Flow Definition:
> Flow.fromSinkAndSource(defineSinkForIncoming(fn2HandleInBoundData), Source
> .maybe) .viaMat(KillSwitches.single)(Keep.right) .takeWhile(nothing => 
> isSinkInUse.get) .watchTermination()((uniqueKillSwitch: UniqueKillSwitch, 
> futureDone: Future[Done]) => { futureDone.map { result => { logThatMessage
> ("Flow is terminated (probably disconnected by the Server or via kill 
> switch.)", None, LogLevelIndicators.INFO, fallBackLogger, 
> localizedLoggerFn) if (!promise2Listen.isCompleted) { promise2Listen 
> success gBoxWebSocketProxyFlowInitStatus.FLOW_TERMINATION_DETECTED } else 
> { logThatMessage("Promise to listen is already completed. Nothing to 
> return here.", None, LogLevelIndicators.INFO, fallBackLogger, 
> localizedLoggerFn) } } } recover { case ex: Throwable => { 
> logThatMessage("Listen 
> channel is already open.", Option(ex), LogLevelIndicators.ERROR, 
> fallBackLogger, localizedLoggerFn) promise2Listen success 
> gBoxWebSocketProxyFlowInitStatus.UNTRACKED_EXCEPTION_DETECTED } }
>
> For context call for defineSinkForIncoming... returns Sink[Message, 
> Future[Done]]!
>
> I've seen posts on this topic in closed state, so was wondering if any one 
> else faced similar issue?
>
> Here's the stack trace:
> [default-akka.actor.default-dispatcher-16] 
> [akka://default/user/StreamSupervisor-5/flow-0-0-unknown-operation] 
> Error in stage [Split]: 
> OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has 
> not been materialized in 5000 milliseconds) (of class 
> akka.stream.actor.ActorSubscriberMessage$OnError) scala.MatchError: 
> OnError(akka.stream.impl.SubscriptionTimeoutException: Substream Source has 
> not been materialized in 5000 milliseconds) (of class 
> akka.stream.actor.ActorSubscriberMessage$OnError) at 
> akka.stream.impl.fusing.SubSource.failSubstream(StreamOfStreams.scala:706) 
> at 
> akka.stream.stage.GraphStageLogic$SubSourceOutlet.fail(GraphStage.scala:1138) 
> at 
> akka.stream.impl.fusing.Split$$anon$2$SubstreamHandler.onUpstreamFailure(StreamOfStreams.scala:552)
>  
> at 
> akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:733)
>  
> at 
> akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616) 
> at 
> akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
>  
> at 
> akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
>  
> at 
> akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
>  
> at 
> akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
>  
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at 
> akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
>  
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
> akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to