Hi
I have a case where I want to use an akka stream with backpressure and insert messages to the stream from multiple threads. Here is the code to explain better import akka.actor.ActorSystemimport akka.stream.scaladsl._import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} import scala.concurrent.duration._import scala.concurrent.{Await, Future} object PublishToSourceQueueFromManyThreads { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() // build the processing pipeline with queue as entry point val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure) .groupedWithin(2, 2.seconds) .mapAsyncUnordered(2) { elem => Future { println(s"${Thread.currentThread().getName} simulating delay, $elem") Thread.sleep(1000L) elem }(scala.concurrent.ExecutionContext.global) }.to(Sink.ignore) .run // here we start few threads that push events to the queue in parallel new Thread(() => { while (true) { val offerResult: Future[QueueOfferResult] = queue.offer(1) Await.ready(offerResult, 10.seconds) println(s"${Thread.currentThread().getName} Emitted 1 $offerResult") } }).start() new Thread(() => { while (true) { val offerResult: Future[QueueOfferResult] = queue.offer(2) Await.ready(offerResult, 10.seconds) println(s"${Thread.currentThread().getName} Emitted 2 $offerResult") } }).start() println("done") } } What I get when I execute the code above is done Thread-1 Emitted 2 Future(Success(Enqueued)) Thread-0 Emitted 1 Future(Success(Enqueued)) scala-execution-context-global-18 simulating delay, Vector(1, 2) Thread-1 Emitted 2 Future(Success(Enqueued)) Thread-0 Emitted 1 Future(Success(Enqueued)) Thread-1 Emitted 2 Future(Success(Enqueued)) scala-execution-context-global-19 simulating delay, Vector(2, 1) Thread-0 Emitted 1 Future(Success(Enqueued)) Thread-1 Emitted 2 Future(Success(Enqueued)) Thread-0 Emitted 1 Future(Success(Enqueued)) Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request)) Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request)) Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request)) Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request)) Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request)) I checked with the source code of Queue that this is the expected behaviour https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala#L21-L22 * - fails when stream is completed or you cannot call offer in this moment because of implementation rules * (like for backpressure mode and full buffer you need to wait for last offer call Future completion) What I did for now is I wrapped the SourceQueue, used synchronized block and I'm Awaiting result before returning from offerBlocking. import akka.actor.ActorSystemimport akka.stream.scaladsl._import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} import scala.concurrent.duration._import scala.concurrent.{Await, Future, TimeoutException} class SyncQueue[T](q: SourceQueue[T]) { /** * @throws TimeoutException if it couldn't get the value within `maxWait` time */ def offerBlocking(elem: T, maxWait: Duration = 10.seconds): Future[QueueOfferResult] = synchronized { val result = q.offer(elem) Await.ready(result, maxWait) result } } object PublishToSourceQueueFromManyThreads { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() // build the queue processing pipeline val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure) .groupedWithin(2, 2.seconds) .mapAsyncUnordered(2) { elem => Future { println(s"${Thread.currentThread().getName} simulating delay, $elem") Thread.sleep(1000L) elem }(scala.concurrent.ExecutionContext.global) }.to(Sink.ignore) .run val queue2 = new SyncQueue(queue) // here we start few threads that would push events to the queue new Thread(() => { while (true) { val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(1) // Await.ready(offerResult, 10.seconds) println(s"${Thread.currentThread().getName} Emitted 1 $offerResult") } }).start() new Thread(() => { while (true) { val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(2) // Await.ready(offerResult, 10.seconds) println(s"${Thread.currentThread().getName} Emitted 2 $offerResult") } }).start() println("done") } } The questions is what is the recommended way to do that? It would be a good example to have on the integration page http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-integrations.html#Source_queue -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.