Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
I do it just via ActorPublisher, the scroll method is basically asynchronously loading elasticsearch records (classic cursor thingy). It's a combination of request demand and asynchronous source of events : def receive: Receive = { case Request(n) if totalDemand 0 n 0 isActive = def pushRecursively(n: Long = Math.min(n, totalDemand), scrollId: String = lastScrollId): Future[Unit] = { scroll(scrollId) flatMap { case (sid, recs) if recs.isEmpty = // empty hits means end of scanning/scrolling onComplete() context.stop(self) Future.successful(()) case (sid, recs) = lastScrollId = sid val contexts = recs.map { case (recId, rec) = EsResource :: Map.empty[String, String] :: recId :: rec :: HNil } onNext(contexts) if (n 1) pushRecursively(n-1, sid) else Future.successful(()) } } pushRecursively() onComplete { case Failure(ex) = onError(ex) context.stop(self) case s = } case Cancel = context.stop(self) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
pushRecursively() onComplete { case Failure(ex) = onError(ex) context.stop(self) case s = } that's not safe. On Thu, Apr 9, 2015 at 1:16 PM, Jakub Liska liska.ja...@gmail.com wrote: I do it just via ActorPublisher, the scroll method is basically asynchronously loading elasticsearch records (classic cursor thingy). It's a combination of request demand and asynchronous source of events : def receive: Receive = { case Request(n) if totalDemand 0 n 0 isActive = def pushRecursively(n: Long = Math.min(n, totalDemand), scrollId: String = lastScrollId): Future[Unit] = { scroll(scrollId) flatMap { case (sid, recs) if recs.isEmpty = // empty hits means end of scanning/scrolling onComplete() context.stop(self) Future.successful(()) case (sid, recs) = lastScrollId = sid val contexts = recs.map { case (recId, rec) = EsResource :: Map.empty[String, String] :: recId :: rec :: HNil } onNext(contexts) if (n 1) pushRecursively(n-1, sid) else Future.successful(()) } } pushRecursively() onComplete { case Failure(ex) = onError(ex) context.stop(self) case s = } case Cancel = context.stop(self) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
but this approach forces you to call the thing that produces items and block in the same dispatcher as the consumer, doesn't it? What's the best option here? Having an Iterator[Future[T]] that return promises of something that's being executed in a different exec. context? Thank you G On Friday, 20 February 2015 18:42:06 UTC, Luis Ángel Vicente Sánchez wrote: That's quite a nice little trick Endre, way better that writing an ActorPublisher if you don't need to communicate with the Producer. I did something similar to create an infinite stream from Amazon SQS (using an infinite Iterator[Unit] and mapAsyncUnordered) but this seems a much better approach. 2015-02-20 10:20 GMT+00:00 Endre Varga endre...@typesafe.com javascript:: Hi Simon, One trick I like to use is to define a Source in terms of a PushPullStage. Now this sounds strange, since a PushPullStage is supposed to be someting that transforms incoming element into outgoing elements, how can that be a Source? Well, the trick is this: def mySource = Source.empty.transform(...) Since the upstream Source of the stage is immediately completed one, you can call ctx.absorbTermination() and then only handle element emission from onPull. Of course you have to propagate the first pull upstream. For example: Source.empty.transform(() ⇒ { new PushPullStage[Nothing, T] { val iterator: Iterator[T] = myIterator // Upstream is guaranteed to be empty override def onPush(elem: Nothing, ctx: Context[T]): Directive = throw new UnsupportedOperationException(The IterableSource stage cannot be pushed) override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { if (iterator.hasNext) ctx.absorbTermination() else ctx.finish() } override def onPull(ctx: Context[T]): Directive = { if (!ctx.isFinishing) { ctx.pull() } else { val elem = iterator.next() if (iterator.hasNext) ctx.push(elem) else ctx.pushAndFinish(elem) } } } The above is a simplified version of the new upcoming iterator source in M4. I guess this pattern can be made proper by a simple DSL that handles the boilerplate. Currently the drawback of this approach is that you cannot send external async events to this kind of source, so it is a bit limited for now, but we will solve that in the future. -Endre On Thu, Feb 19, 2015 at 11:41 PM, Simon Schäfer ma...@antoras.de javascript: wrote: I struggle in nicely defining a Source that gets its elements from an arbitrary event stream. At the moment my code looks like this: def watchKey[A : Reads](key: SettingKey[A])(implicit ctx: ExecutionContext): Source[Out[A]] = { Source(new Publisher[Out[A]] { var requestedElems = 0L var cancellation: sbt.client.Subscription = _ val subs = new Subscription { def request(n: Long): Unit = { requestedElems = n } def cancel(): Unit = { cancellation.cancel() } } override def subscribe(s: Subscriber[_ : Out[A]]): Unit = { def sendElem(elem: Out[A]) = { requestedElems -= 1 s.onNext(elem) } s.onSubscribe(subs) cancellation = client.lazyWatch(key) { (key, res) ⇒ val elem = res map (key → _) if (requestedElems 0) sendElem(elem) else ??? // TODO handle case of no requested elems } } }) } I had to define my own (incorrect) Publisher+Subscription, which seems to me not being the right way to do this. The `lazyWatch` method takes a function that is called each time an event occurs. Furthermore a subscription needs to be canceled when no new events should be sent. What abstractions does akke-streams provide to make doing this sort of thing easier? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group,
Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
Hi Simon, One trick I like to use is to define a Source in terms of a PushPullStage. Now this sounds strange, since a PushPullStage is supposed to be someting that transforms incoming element into outgoing elements, how can that be a Source? Well, the trick is this: def mySource = Source.empty.transform(...) Since the upstream Source of the stage is immediately completed one, you can call ctx.absorbTermination() and then only handle element emission from onPull. Of course you have to propagate the first pull upstream. For example: Source.empty.transform(() ⇒ { new PushPullStage[Nothing, T] { val iterator: Iterator[T] = myIterator // Upstream is guaranteed to be empty override def onPush(elem: Nothing, ctx: Context[T]): Directive = throw new UnsupportedOperationException(The IterableSource stage cannot be pushed) override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { if (iterator.hasNext) ctx.absorbTermination() else ctx.finish() } override def onPull(ctx: Context[T]): Directive = { if (!ctx.isFinishing) { ctx.pull() } else { val elem = iterator.next() if (iterator.hasNext) ctx.push(elem) else ctx.pushAndFinish(elem) } } } The above is a simplified version of the new upcoming iterator source in M4. I guess this pattern can be made proper by a simple DSL that handles the boilerplate. Currently the drawback of this approach is that you cannot send external async events to this kind of source, so it is a bit limited for now, but we will solve that in the future. -Endre On Thu, Feb 19, 2015 at 11:41 PM, Simon Schäfer m...@antoras.de wrote: I struggle in nicely defining a Source that gets its elements from an arbitrary event stream. At the moment my code looks like this: def watchKey[A : Reads](key: SettingKey[A])(implicit ctx: ExecutionContext): Source[Out[A]] = { Source(new Publisher[Out[A]] { var requestedElems = 0L var cancellation: sbt.client.Subscription = _ val subs = new Subscription { def request(n: Long): Unit = { requestedElems = n } def cancel(): Unit = { cancellation.cancel() } } override def subscribe(s: Subscriber[_ : Out[A]]): Unit = { def sendElem(elem: Out[A]) = { requestedElems -= 1 s.onNext(elem) } s.onSubscribe(subs) cancellation = client.lazyWatch(key) { (key, res) ⇒ val elem = res map (key → _) if (requestedElems 0) sendElem(elem) else ??? // TODO handle case of no requested elems } } }) } I had to define my own (incorrect) Publisher+Subscription, which seems to me not being the right way to do this. The `lazyWatch` method takes a function that is called each time an event occurs. Furthermore a subscription needs to be canceled when no new events should be sent. What abstractions does akke-streams provide to make doing this sort of thing easier? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?
That's quite a nice little trick Endre, way better that writing an ActorPublisher if you don't need to communicate with the Producer. I did something similar to create an infinite stream from Amazon SQS (using an infinite Iterator[Unit] and mapAsyncUnordered) but this seems a much better approach. 2015-02-20 10:20 GMT+00:00 Endre Varga endre.va...@typesafe.com: Hi Simon, One trick I like to use is to define a Source in terms of a PushPullStage. Now this sounds strange, since a PushPullStage is supposed to be someting that transforms incoming element into outgoing elements, how can that be a Source? Well, the trick is this: def mySource = Source.empty.transform(...) Since the upstream Source of the stage is immediately completed one, you can call ctx.absorbTermination() and then only handle element emission from onPull. Of course you have to propagate the first pull upstream. For example: Source.empty.transform(() ⇒ { new PushPullStage[Nothing, T] { val iterator: Iterator[T] = myIterator // Upstream is guaranteed to be empty override def onPush(elem: Nothing, ctx: Context[T]): Directive = throw new UnsupportedOperationException(The IterableSource stage cannot be pushed) override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { if (iterator.hasNext) ctx.absorbTermination() else ctx.finish() } override def onPull(ctx: Context[T]): Directive = { if (!ctx.isFinishing) { ctx.pull() } else { val elem = iterator.next() if (iterator.hasNext) ctx.push(elem) else ctx.pushAndFinish(elem) } } } The above is a simplified version of the new upcoming iterator source in M4. I guess this pattern can be made proper by a simple DSL that handles the boilerplate. Currently the drawback of this approach is that you cannot send external async events to this kind of source, so it is a bit limited for now, but we will solve that in the future. -Endre On Thu, Feb 19, 2015 at 11:41 PM, Simon Schäfer m...@antoras.de wrote: I struggle in nicely defining a Source that gets its elements from an arbitrary event stream. At the moment my code looks like this: def watchKey[A : Reads](key: SettingKey[A])(implicit ctx: ExecutionContext): Source[Out[A]] = { Source(new Publisher[Out[A]] { var requestedElems = 0L var cancellation: sbt.client.Subscription = _ val subs = new Subscription { def request(n: Long): Unit = { requestedElems = n } def cancel(): Unit = { cancellation.cancel() } } override def subscribe(s: Subscriber[_ : Out[A]]): Unit = { def sendElem(elem: Out[A]) = { requestedElems -= 1 s.onNext(elem) } s.onSubscribe(subs) cancellation = client.lazyWatch(key) { (key, res) ⇒ val elem = res map (key → _) if (requestedElems 0) sendElem(elem) else ??? // TODO handle case of no requested elems } } }) } I had to define my own (incorrect) Publisher+Subscription, which seems to me not being the right way to do this. The `lazyWatch` method takes a function that is called each time an event occurs. Furthermore a subscription needs to be canceled when no new events should be sent. What abstractions does akke-streams provide to make doing this sort of thing easier? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User
[akka-user] akka-streams - How to define a Source from an arbitrary event stream?
I struggle in nicely defining a Source that gets its elements from an arbitrary event stream. At the moment my code looks like this: def watchKey[A : Reads](key: SettingKey[A])(implicit ctx: ExecutionContext): Source[Out[A]] = { Source(new Publisher[Out[A]] { var requestedElems = 0L var cancellation: sbt.client.Subscription = _ val subs = new Subscription { def request(n: Long): Unit = { requestedElems = n } def cancel(): Unit = { cancellation.cancel() } } override def subscribe(s: Subscriber[_ : Out[A]]): Unit = { def sendElem(elem: Out[A]) = { requestedElems -= 1 s.onNext(elem) } s.onSubscribe(subs) cancellation = client.lazyWatch(key) { (key, res) ⇒ val elem = res map (key → _) if (requestedElems 0) sendElem(elem) else ??? // TODO handle case of no requested elems } } }) } I had to define my own (incorrect) Publisher+Subscription, which seems to me not being the right way to do this. The `lazyWatch` method takes a function that is called each time an event occurs. Furthermore a subscription needs to be canceled when no new events should be sent. What abstractions does akke-streams provide to make doing this sort of thing easier? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.