Thanks for the pointers Endre,  I’ll explore those ideas.

Frank

> On Jan 22, 2015, at 4:02 AM, Endre Varga <endre.va...@typesafe.com> wrote:
> 
> 
> 
> On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer <fsaue...@gmail.com 
> <mailto:fsaue...@gmail.com>> wrote:
> Update, in a simple test scenario like so 
> 
>   val ticks = Source(1 second, 1 second, () => "Hello")
> 
>   val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => 
> true)).to(Sink.foreach(println(_)))
> 
>   flow.run()
> 
> I'm seeing the following error, so this doesn't work at all and I'm not sure 
> it is because of threading:
> 
> java.lang.ArrayIndexOutOfBoundsException: -1
>       at 
> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
>       at 
> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
>       at 
> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
>       at 
> experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
>       at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>       at 
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> I think I'm violating the one very important rule mentioned in the docs - 
> when the timer fires it calls a push on the context but there is also a pull 
> going on concurrently(?) - and this is indeed breaking in spectacular ways as 
> expected.... 
> 
> :)
>  
> 
> I have no idea how to implement this correctly. It looked pretty simple at 
> first, but alas... 
> 
> See my previous mail. The main problem here is mixing backpressured streams 
> (your data) and non-backpressured events (timer triggers) in a safe fashion. 
> Well, the main problem is not how to implement it, but how to expose an API 
> to users which is as safe as possible. We have groupedWithin, takeWithin and 
> dropWithin as timer based ops, but no customization for now.
> 
> -Endre
>  
> 
> On Wednesday, January 21, 2015 at 8:51:21 PM UTC-5, Frank Sauer wrote:
> Thanks, I came up with the following, but I have some questions:
> 
> /**
>    * Holds elements of type A for a given finite duration after a predicate p 
> first yields true and as long as subsequent
>    * elements matching that first element (e.g. are equal) still satisfy the 
> predicate. If a matching element arrives during
>    * the given FiniteDuration for which the predicate p does not hold, the 
> original element will NOT be pushed downstream.
>    * Only when the timer expires and no matching elements have been seen for 
> which p does not hold, will elem be pushed
>    * downstream.
>    *
>    * @param duration The polling interval during which p has to hold true
>    * @param p        The predicate that has to remain true during the duration
>    * @param system   implicit required to schedule timers
>    * @tparam A       type of the elements
>    */
>   class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit 
> system: ActorSystem) extends PushStage[A,A] {
> 
>     var state : Map[A,Cancellable] = Map.empty
> 
>     override def onPush(elem: A, ctx: Context[A]): Directive = 
> state.get(elem) match {
> 
>       case Some(timer) if !p(elem) => // pending timer but condition no 
> longer holds => cancel timer
>          timer.cancel()
>          state = state - elem
>          ctx.pull()
> 
>        case None if p(elem) => // no pending timer and predicate true -> 
> start and cache new timer
>          val timer = system.scheduler.scheduleOnce(duration) {
>            // when timer fires, remove from state and push elem downstream
>            state = state - elem
>            ctx.push(elem); // is this safe?
>          }
>          state = state + (elem -> timer)
>          ctx.pull()
> 
>        case _ => ctx.pull() // otherwise simply wait for the next upstream 
> element
>     }
> 
>   }
> 
> My main concerns are these:
> 
> 1) Is it safe to invoke ctx.push from the thread on which the timer fires?
> 2) How do I react to upstream or downstream finish or cancel events - do I 
> have to?
> 3) Can I integrate this into the DSL without using transform, e.g. can I 
> somehow add a filterFor method on something via a pimp my library?
> 
> Any and all pointers would be very much appreciated,
> 
> Thanks,
> 
> Frank
> 
> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
> Hi Frank!
> We do not have such operations off-the-shelf, however they are easily 
> implementable by using custom stream processing stages:
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html
>  
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-customize.html>
> 
> Be sure to refer to the cookbook for some inspiration on how to implement 
> your own stages:
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html
>  
> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html>
> 
> Hope this helps, and feel free to ask for help in case you get stuck :-)
> 
> -- 
> Konrad
> 
> On Thu, Jan 15, 2015 at 3:57 AM, Frank Sauer <fsau...@gmail.com <>> wrote:
> I have two uses cases that I'm used to from using CEP systems like Esper and 
> I'm trying to figure out if I can implements them (easily) with Akka Streams:
> 
> 1) test if in a stream of events ALL new events satisfy some predicate during 
> some finite interval of time, which starts at the time the predicate yields 
> true the first time. This is useful to generate alerts on a stream of 
> measurements but only if some faulty condition persists for some given time. 
> 
> 2) test is some event does NOT occur after some other event within some 
> finite duration 
> 
> 
> My question is if these are supported by existing aka streams flow graph DSL 
> elements or if a custom transformer is required. If the latter, I'd 
> appreciate any pointers on how to approach writing it.
> 
> Thanks,
> 
> Frank
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com <>.
> To post to this group, send email to akka...@googlegroups.com <>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.
> 
> 
> 
> -- 
> Akka Team
> Typesafe - The software stack for applications that scale
> Blog: letitcrash.com <http://letitcrash.com/>
> Twitter: @akkateam
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> <mailto:akka-user+unsubscr...@googlegroups.com>.
> To post to this group, send email to akka-user@googlegroups.com 
> <mailto:akka-user@googlegroups.com>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.
> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to a topic in the Google 
> Groups "Akka User List" group.
> To unsubscribe from this topic, visit 
> https://groups.google.com/d/topic/akka-user/QAJou4yCW3k/unsubscribe 
> <https://groups.google.com/d/topic/akka-user/QAJou4yCW3k/unsubscribe>.
> To unsubscribe from this group and all its topics, send an email to 
> akka-user+unsubscr...@googlegroups.com 
> <mailto:akka-user+unsubscr...@googlegroups.com>.
> To post to this group, send email to akka-user@googlegroups.com 
> <mailto:akka-user@googlegroups.com>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to