In the graph below I am at wits end trying to figure something out. Here is 
what I am trying to accomplish. My interval source contains N intervals. I 
than want to pipe those intervals into my day source which ends up 
aggregating the number of Tweet Ids per day, than back in the main graph, I 
get the number of tweet ids across all the days input. (The reason Im doing 
it this way is that I care about deduping only within a day boundary).

What's happening with the below code is that the .map(_.size) ends up 
getting called AFTER all the intervals have been piped into the dayFlow. I 
am guessing this is because .fold waits for upstream to be finished before 
emiting any data, which makes sense why this isn't working properly. I 
would like this fold to happen once for every Interval passed to the 
dayFlow. I have tried many things to get this working, but I fee that I am 
just missing something silly to get this working.

TL;DR: I am trying to pipeline my dayFlow WITHOUT parallelizing it. 


   val parallelMinuteFlow: Flow[DateTime, ImposterLog, NotUsed] = {
      Flow[DateTime]
        .via(combineS3Prefixes(prefixes))
        .via(flows.flattenSeq)
        .map { prefix =>
          s3Client.listBucket(context.s3Bucket, Some(prefix))
        }.flatMapConcat(identity)
        .via(consumptionStats.statsCounterFlow[String]("files", env)(_ => 
1))
        .via(flows.downloadS3ObjectFlow(context.s3Bucket))
        .via(s3toImposterFlow(Staging2))
        .via(consumptionStats.statsCounterFlow[ImposterLog]("jsons", env)(_ 
=> 1))
        .named("minuteFlow")
    }

    val dayFlow: Flow[Interval, Long, NotUsed] = {
      Flow[Interval]
        .via(Flow[Interval].map { (interval: Interval) =>
          info(s"Starting on interval ${ interval.toString }")
          val numberOfMinutes = new Duration(interval.getStart, 
interval.getEnd).getStandardMinutes.toInt
          (0 until numberOfMinutes).foldLeft(Seq.empty[DateTime])((acc, i) 
=> {
            acc ++ Seq(interval.getStart.plusMinutes(i))
          })
        })
        .via(flows.flattenSeq)
        .via(flows.parallelizeFlow(parallelize = context.parallelize, 
parallelMinuteFlow))
        .fold(Set.empty[Long])((acc: Set[Long], log: ImposterLog) => {
          acc ++ log.data.map(tweet => tweet.id.split(":")(2).toLong).toSet
        })
        .map(_.size)
    }

    val graph: RunnableGraph[(UniqueKillSwitch, Future[Long])] = {
      intervalSource
        .via(dayFlow)
        .viaMat(KillSwitches.single)(Keep.right)
        .toMat(Sink.fold(0L)((acc, idsInDay) => {
          acc + idsInDay
        }))(Keep.both)
    }


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to