GitHub user gaeljw added a comment to the discussion: [Pekko Streams] Best
practice for streaming from a resource that can take several seconds/minutes to
start offering an element
Thanks for your message @raboof .
Here's a (not so) minified version of the code I was using. However, _as often_
, now that I'm sharing this with you, I was able to make the async variant
work. I'll need to review the more complex real code to see if I missed
something and async does work or if there's something else preventing async to
work.
---
**Sync version**
```scala
object PekkoMain {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
logger.info("Hello, Pekko!")
implicit val system = ActorSystem("PekkoSystem")
implicit val ec = system.dispatcher
val ks = KillSwitches.shared("kill-switch")
// KillSwitch is not able to stop the flow
val source: Source[String, _] = CustomSource.source()
// KillSwitch does stop the flow
// val source: Source[String, _] =
Source.repeat("Repeat").initialDelay(15.seconds).delay(1.second).take(20)
val done: Future[Done] = source
.via(ks.flow)
.watchTermination() { (_, f) =>
f.onComplete { _ =>
logger.info(s"onComplete")
}
NotUsed
}
.runForeach(logger.info)
Future {
Thread.sleep(10 * 1000)
logger.info("Trigger KillSwitch")
ks.shutdown()
}
Await.result(done, 60.seconds)
Await.result(system.terminate(), 60.seconds)
()
}
}
object CustomSource {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
def source(): Source[String, Future[Int]] = Source.fromGraph(new
ResultSource())
private class ResultSource() extends
GraphStageWithMaterializedValue[SourceShape[String], Future[Int]] {
val out: Outlet[String] = Outlet(s"$toString.out")
val shape: SourceShape[String] = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[Int]) = {
val result = Promise[Int]()
val logic = new GraphStageLogic(shape) with OutHandler {
private var cursor: Option[String] = None
private var counter: Int = 0
private var cancel = false
override def postStop() = release()
private def release(): Unit = {
logger.info("Releasing resources")
}
private def nextCursor(): Unit = {
if (cancel) {
logger.info("nextCursor -> cancelled")
throw new RuntimeException("Cancelled")
}
cursor = if (counter == 0) {
logger.info("nextCursor -> Sleeping for 15 seconds")
(1 to 15).foreach { _ =>
if (!cancel) {
logger.info("nextCursor -> Sleeping")
Thread.sleep(1000)
} else {
logger.info("nextCursor -> cancelled")
throw new RuntimeException("Cancelled")
}
}
Some(s"Item $counter")
} else if (counter < 50) {
logger.info("nextCursor -> Immediate result")
Some(s"Item $counter")
} else {
logger.info("nextCursor -> No more results")
None
}
}
def onPull(): Unit = {
nextCursor()
cursor match {
case Some(c) =>
counter += 1
push(out, c)
case _ =>
result.success(counter)
complete(out)
}
}
@nowarn
override def onDownstreamFinish() = {
logger.info("onDownstreamFinish")
result.tryFailure(new InterruptedException("Downstream finished"))
release()
cancel = true
super.onDownstreamFinish(): @nowarn
}
setHandler(out, this)
}
logic -> result.future
}
}
}
```
Gives this output:
```
2024-12-06 18:46:48,386 INFO main PekkoMain$ - Hello, Pekko!
2024-12-06 18:46:49,333 INFO PekkoSystem-pekko.actor.default-dispatcher-4
o.a.p.e.s.Slf4jLogger - Slf4jLogger started
2024-12-06 18:46:49,807 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping for 15 seconds
2024-12-06 18:46:49,807 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:50,807 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:51,807 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:52,808 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:53,808 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:54,808 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:55,809 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:56,809 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:57,809 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:58,809 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:59,803 INFO PekkoSystem-pekko.actor.default-dispatcher-4
PekkoMain$ - Trigger KillSwitch
2024-12-06 18:46:59,810 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:00,810 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:01,810 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:02,826 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:03,827 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:04,828 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 0
2024-12-06 18:47:04,828 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,828 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 1
2024-12-06 18:47:04,828 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 2
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 3
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 4
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 5
2024-12-06 18:47:04,829 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
...
2024-12-06 18:47:04,838 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,838 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 48
2024-12-06 18:47:04,838 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,838 INFO PekkoSystem-pekko.actor.default-dispatcher-5
PekkoMain$ - Item 49
2024-12-06 18:47:04,838 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - nextCursor -> No more results
2024-12-06 18:47:04,839 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - Releasing resources
2024-12-06 18:47:04,843 INFO PekkoSystem-pekko.actor.default-dispatcher-4
PekkoMain$ - onComplete
2024-12-06 18:47:04,874 INFO PekkoSystem-pekko.actor.default-dispatcher-4
o.a.p.a.CoordinatedShutdown - Running CoordinatedShutdown with reason
[ActorSystemTerminateReason]
```
---
**Async version**
```scala
object PekkoMain {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
// Same as for sync with:
val blockingEc =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
val source: Source[String, _] = CustomSource.asyncSource(blockingEc)
}
}
object CustomSource {
private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
def asyncSource(blockingEc: ExecutionContext): Source[String, Future[Int]] =
Source.fromGraph(new AsyncResultSource(blockingEc))
private class AsyncResultSource(blockingEc: ExecutionContext) extends
GraphStageWithMaterializedValue[SourceShape[String], Future[Int]] {
val out: Outlet[String] = Outlet(s"$toString.out")
val shape: SourceShape[String] = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[Int]) = {
val result = Promise[Int]()
val logic = new GraphStageLogic(shape) with OutHandler {
private var counter: Int = 0
private var cancel = false
val cb: AsyncCallback[Try[Option[String]]] =
getAsyncCallback[Try[Option[String]]] { itemOptTry =>
itemOptTry match {
case Success(itemOpt) =>
itemOpt match {
case Some(c) =>
counter += 1
push(out, c)
case _ =>
result.success(counter)
complete(out)
}
case Failure(ex) => failStage(ex)
}
}
override def postStop() = release()
private def release(): Unit = {
logger.info("Releasing resources")
}
private def nextCursor(): Unit = {
if (cancel) {
logger.info("nextCursor -> cancelled")
throw new RuntimeException("Cancelled")
}
val f = Future {
blocking {
if (counter == 0) {
logger.info("nextCursor -> Sleeping for 15 seconds")
logger.info("nextCursor -> Sleeping")
Thread.sleep(15000)
Some(s"Item $counter")
} else if (counter < 50) {
logger.info("nextCursor -> Immediate result")
Some(s"Item $counter")
} else {
logger.info("nextCursor -> No more results")
None
}
}
}(blockingEc)
f.onComplete(cb.invoke)(ExecutionContext.parasitic)
}
def onPull(): Unit = {
nextCursor()
}
@nowarn
override def onDownstreamFinish() = {
logger.info("onDownstreamFinish")
result.tryFailure(new InterruptedException("Downstream finished"))
release()
cancel = true
super.onDownstreamFinish(): @nowarn
}
setHandler(out, this)
}
logic -> result.future
}
}
}
```
Gives the following output:
```
2024-12-06 18:51:13,325 INFO main PekkoMain$ - Hello, Pekko!
2024-12-06 18:51:14,372 INFO PekkoSystem-pekko.actor.default-dispatcher-4
o.a.p.e.s.Slf4jLogger - Slf4jLogger started
2024-12-06 18:51:14,828 INFO pool-1-thread-1 CustomSource$ - nextCursor ->
Sleeping for 15 seconds
2024-12-06 18:51:14,828 INFO pool-1-thread-1 CustomSource$ - nextCursor ->
Sleeping
2024-12-06 18:51:24,826 INFO PekkoSystem-pekko.actor.default-dispatcher-4
PekkoMain$ - Trigger KillSwitch
2024-12-06 18:51:24,845 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - onDownstreamFinish
2024-12-06 18:51:24,846 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - Releasing resources
2024-12-06 18:51:24,850 INFO PekkoSystem-pekko.actor.default-dispatcher-5
CustomSource$ - Releasing resources
2024-12-06 18:51:24,850 INFO PekkoSystem-pekko.actor.default-dispatcher-4
PekkoMain$ - onComplete
2024-12-06 18:51:24,884 INFO PekkoSystem-pekko.actor.default-dispatcher-4
o.a.p.a.CoordinatedShutdown - Running CoordinatedShutdown with reason
[ActorSystemTerminateReason]
```
GitHub link:
https://github.com/apache/pekko/discussions/1572#discussioncomment-11487721
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]