Hi

I have a case where I want to use an akka stream with backpressure and 
insert messages to the stream from multiple threads.

Here is the code to explain better

import akka.actor.ActorSystemimport akka.stream.scaladsl._import 
akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import scala.concurrent.duration._import scala.concurrent.{Await, Future}
object PublishToSourceQueueFromManyThreads {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    // build the processing pipeline with queue as entry point
    val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure)
      .groupedWithin(2, 2.seconds)
      .mapAsyncUnordered(2) { elem =>
        Future {
          println(s"${Thread.currentThread().getName} simulating delay, $elem")
          Thread.sleep(1000L)
          elem
        }(scala.concurrent.ExecutionContext.global)
      }.to(Sink.ignore)
      .run

    // here we start few threads that push events to the queue in parallel
    new Thread(() => {
      while (true) {
        val offerResult: Future[QueueOfferResult] = queue.offer(1)
        Await.ready(offerResult, 10.seconds)
        println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
      }
    }).start()

    new Thread(() => {
      while (true) {
        val offerResult: Future[QueueOfferResult] = queue.offer(2)
        Await.ready(offerResult, 10.seconds)
        println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
      }
    }).start()

    println("done")

  }
}

What I get when I execute the code above is

done
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
scala-execution-context-global-18 simulating delay, Vector(1, 2)
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-1 Emitted 2 Future(Success(Enqueued))
scala-execution-context-global-19 simulating delay, Vector(2, 1)
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-1 Emitted 2 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Success(Enqueued))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to 
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to 
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to 
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to 
wait for previous offer to be resolved to send another request))
Thread-0 Emitted 1 Future(Failure(java.lang.IllegalStateException: You have to 
wait for previous offer to be resolved to send another request))

I checked with the source code of Queue that this is the expected behaviour 
https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala#L21-L22

   * - fails when stream is completed or you cannot call offer in this moment 
because of implementation rules
   * (like for backpressure mode and full buffer you need to wait for last 
offer call Future completion)

What I did for now is I wrapped the SourceQueue, used synchronized block 
and I'm Awaiting result before returning from offerBlocking.

import akka.actor.ActorSystemimport akka.stream.scaladsl._import 
akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import scala.concurrent.duration._import scala.concurrent.{Await, Future, 
TimeoutException}
class SyncQueue[T](q: SourceQueue[T]) {
  /**    * @throws TimeoutException if it couldn't get the value within 
`maxWait` time    */
  def offerBlocking(elem: T, maxWait: Duration = 10.seconds): 
Future[QueueOfferResult] = 
    synchronized {
      val result = q.offer(elem)
      Await.ready(result, maxWait)
      result
    }
}
object PublishToSourceQueueFromManyThreads {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()

    // build the queue processing pipeline
    val queue = Source.queue[Int](bufferSize = 2, OverflowStrategy.backpressure)
      .groupedWithin(2, 2.seconds)
      .mapAsyncUnordered(2) { elem =>
        Future {
          println(s"${Thread.currentThread().getName} simulating delay, $elem")
          Thread.sleep(1000L)
          elem
        }(scala.concurrent.ExecutionContext.global)
      }.to(Sink.ignore)
      .run

    val queue2 = new SyncQueue(queue)

    // here we start few threads that would push events to the queue
    new Thread(() => {
      while (true) {
        val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(1)
        // Await.ready(offerResult, 10.seconds)
        println(s"${Thread.currentThread().getName} Emitted 1 $offerResult")
      }
    }).start()

    new Thread(() => {
      while (true) {
        val offerResult: Future[QueueOfferResult] = queue2.offerBlocking(2)
        // Await.ready(offerResult, 10.seconds)
        println(s"${Thread.currentThread().getName} Emitted 2 $offerResult")
      }
    }).start()

    println("done")
  }
}

The questions is what is the recommended way to do that? It would be a good 
example to have on the integration page 
http://doc.akka.io/docs/akka/2.4.17/scala/stream/stream-integrations.html#Source_queue

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to