Hi Jason,

What is exactly that does not work? This is a bit too much code to digest,
can you reduce it to a reproducer of your actual problem?

-Endre

On Mon, Mar 30, 2015 at 6:34 PM, Jason Martens <m...@jasonmartens.com> wrote:

> Hello Akka users,
>
>    I've been trying out akka-streams, and I have a sample project
> partially working but can't get this particular flow graph to work
> correctly. I'm writing a wrapper around a slow backend service that will
> cache objects in another cache backend. When a request comes in, I want the
> flow to attempt to pull from the cache backend, and if that fails pull from
> the slow backend and also load the object into the cache. One requirement
> is that the slow backend can only return entire objects, but that requests
> are normally only made for a part of the object. To implement this, I have
> been trying to build a graph that looks something like this:
>
> slowBackendSource[FullObject] ~> FlexiRoute[FullObject] ~>
> LoadIntoCacheSink
>                                                     ~>
> FlexiRoute[PartialObject]        ~> FlexiMerge[PartialObject]
>
>  CacheBackendSource[PartialObject] ~> FlexiMerge[PartialObject]  ~>
> Output[PartialObject]
>
> The idea is that I could attach both Sources and the Output sink at
> runtime before the flow is materialized, since I need to parameterize the
> Source for the correct PartialObject.
>
> Is this the best way of doing this? Should I use a custom ActorPublisher
> instead that has a side-effect of loading objects into the cache? The
> current code I'm trying to get working is below, with some type errors I
> can't figure out how to resolve marked in the comments.
>
> Thanks in advance for your help. I'm very excited about the 1.0, whenever
> it's ready!
>
> Jason
>
>
>
> import akka.stream.FanInShape
> import akka.stream.FanOutShape
> import akka.stream.scaladsl._
> import akka.stream._
>
> import scala.collection.immutable
>
>
> class ObjectSplitShape[O, P](_init: FanOutShape.Init[(O, P)] =
> FanOutShape.Name[(O, P)]("ObjectSplit"))
>   extends FanOutShape[(O, P)](_init) {
>   val outObject = newOutlet[O]("outObject")
>   val outPart = newOutlet[P]("outPart")
>   protected override def construct(i: FanOutShape.Init[(O, P)]) = new
> ObjectSplitShape(i)
> }
> class ObjectSplit[O, P] extends FlexiRoute[(O, P), ObjectSplitShape[O, P]](
>   new ObjectSplitShape, OperationAttributes.name("ObjectSplit")) {
>   import FlexiRoute._
>
>   override def createRouteLogic(p: PortT) = new RouteLogic[(O, P)] {
>     override def initialState =
>       State[Any](DemandFrom(p.outPart)) {
>         (ctx, _, element) =>
>           val (object, objectPart) = element
>           ctx.emit(p.outObject)(object)
>           ctx.emit(p.outPart)(objectPart)
>           SameState
>       }
>   }
> }
>
>
> class ObjectPartMergeShape[P](_init: FanInShape.Init[P] =
> FanInShape.Name("ObjectPartMerge"))
>   extends FanInShape[P](_init) {
>   val slowSource = newInlet[P]("slowSource")
>   val cacheSource = newInlet[P]("cacheSource")
>
>   protected override def construct(i: FanInShape.Init[P]) = new
> ObjectPartMergeShape(i)
> }
> class ObjectPartMerge[P] extends FlexiMerge[P, ObjectPartMergeShape[P]](
>   new ObjectPartMergeShape, OperationAttributes.name("ReadCacheFirst")) {
>   import akka.stream.scaladsl.FlexiMerge._
>
>   override def createMergeLogic(p: PortT) = new MergeLogic[P] {
>     override def initialState =
>       State[P](ReadPreferred(p.cacheSource, p.slowSource)) {
>         (ctx, input, element) =>
>           ctx.emit(element)
>           SameState
>       }
>     override def initialCompletionHandling = eagerClose
>   }
> }
>
>
> case class ObjectShape[O, P](slowIn: Inlet[(O, P)], cacheIn: Inlet[P],
> responseOut: Outlet[P]) extends Shape {
>   override val inlets: immutable.Seq[Inlet[_]] = slowIn :: cacheIn :: Nil
>   override val outlets: immutable.Seq[Outlet[_]] = responseOut :: Nil
>
>   override def deepCopy() = ObjectShape(
>     new Inlet[(O, P)](slowIn.toString),
>     new Inlet[P](cacheIn.toString),
>     new Outlet[P](responseOut.toString))
>
>   override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets:
> immutable.Seq[Outlet[_]]) = {
>     assert(inlets.size == this.inlets.size)
>     assert(outlets.size == this.outlets.size)
>
> //    Type mismatch, expected: Inlet[(NotInferedO, NotInferedP)], acutal:
> Inlet[_] for Inlets(0)
>     ObjectShape(inlets(0), inlets(1), outlets(0))
>   }
> }
>
>
> object ObjectGraph {
>   def apply[O, P](slowSource: Inlet[(O, P)], cacheSource: Inlet[P],
> cacheBackend: CacheBackend):
>   Graph[ObjectShape[O, P], Nothing] = {
>     FlowGraph.partial() { implicit b =>
>     import FlowGraph.Implicits._
>
>
>       val objectSplitRoute = b.add(new ObjectSplit())
>       val objectPartMerge = b.add(new ObjectPartMerge())
>       val cacheSink = b.add(Sink.foreach[Object](fullObject =>
> cacheBackend.loadIntoCache(fullObject)))
>
>       slowSource ~> objectSplitRoute.in // Compile error "Cannot resolve
> symbol ~>". Should this be a Source type instead of an Inlet?
>       objectSplitRoute.outPart ~> objectPartMerge.slowSource
>       objectSplitRoute.outObject ~> cacheSink.inlet
>
>       ObjectShape(slowIn = slowSource, cacheIn = cacheSource, responseOut
> = objectPartMerge.out)
>     }
>   }
> }
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to