[akka-user] Error handling with Source.queue BroadcastHub

2017-10-21 Thread subopt1
Hi,

I'm using a Source.queue with BroadcastHub to implement a pattern where a 
web request can add an item to the queue, attach to the graph and get a 
result. The problem I'm stuck on is that I'm not sure how to handle errors 
without failing the graph. Example:

Without the supervision strategy I see getting the following output

flow handling 1
Result 1 Success(Some(error))
Result 2 Success(None)

The graph stops after the exception and "2" is never processed

With the supervision, the graph recovers and processes "2" but never sees 
"error"

flow handling 1
Graph failed java.lang.Exception.. resuming
Result 1 Failure(java.util.concurrent.TimeoutException: No elements passed 
in the last 1 second.)
flow handling 2
Result 2 Success(Some(2))

What I'd like to see is 

Result 1 Success(Some(error))
Result 2 Success(Some(2))

Is there a way I can recover the graph on a failure but also see the value 
returned from the "recover" function. Also, why does the graph still fail 
the graph when there is a recover? Thanks
 

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, 
SourceQueueWithComplete}

import scala.concurrent.Await
import scala.util.Try


object QueueGraph extends App {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val sourceQueue: Source[String, SourceQueueWithComplete[String]] =
Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)

  val decider: Supervision.Decider = {
case e: Exception =>
  println(s"Graph failed ${e}.. resuming")
  Supervision.Resume
  }

  val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) 
=
sourceQueue
.via(Flow[String].map { num =>
  println(s"flow handling ${num}")
  if (num == "1") {
throw new Exception()
  } else {
num
  }
})
.recover{case e: Exception => "error"}

// If supervisor is removed, the recover catches the exception and returns 
"error" but the graph fails and no longer processes anything. With supervisor 
"error" is never emitted
//.withAttributes(ActorAttributes.supervisionStrategy(decider))
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()

  import scala.concurrent.duration._

  // find error
  val f = source
.filter(_ == "error")
  .idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()

  queue.offer("1")

  println(s"Result 1 ${Try(Await.result(f, Duration.Inf))}")

  queue.offer("2")

  val f2 = source
.filter(_ == "2")
.idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()

  println(s"Result 2 ${Try(Await.result(f2, Duration.Inf))}")
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to get materialized values from a custom Sink

2017-02-08 Thread subopt1
Hi, 

This might be a silly question but I could not find an answer in the docs. 
I have a custom Sink (GraphStageLogic[SinkShape[T]]) which publishes to an 
event bus and runs for a predetermined duration. This works however there 
is no materialized value other than NotUsed since SinkShape only supports 
an Inlet. I want to get a Future[T] back so I can monitor the stream and 
react to success/failure. Is the best way to accomplish this to make my 
Sink into a Flow of FlowShape[T,T] then apply a Sink.ignore/fold to 
materialize a value?

Thanks,
Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: akka streams graph is not fully drained when Future[Done] completes

2017-01-20 Thread subopt1
To clarify, this is not a bug in akka. The issue is there are inflight 
futures that must be accounted for. To summarize you need to keep track of 
your inflight operations (futures) and override onUpstreamFinish in the 
InHandler, for example:

override def onUpstreamFinish(): Unit = {
  if (inflight == 0) {
complete(out)
  } else {
// there are still items in progress, wait and we'll complete the out port 
in future callback
  }
}


Then in my getAsyncCallback

if (inflight == 0) {
  if (isClosed(in)) {
complete(out)
  }
}


Andrew

On Friday, December 16, 2016 at 7:44:12 AM UTC-7, sub...@gmail.com wrote:

> Hi,
>
> I'm seeing an issue where the graph completes while there is still data in 
> one of the flows. The last element emitted by the source enters a custom 
> GraphStageLogic flow, where it is sent to a function that returns a Future. 
> That Future has a callback which invokes getAsyncCallback and then 
> push(out). For the last element, the Future callback fires (pushCallback
> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>
> For more context, this is what I have going on inside the GraphStageLogic:
>
> val s3ListBucket: Source[ByteString, NotUsed] =
>   s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)
>
> val bucketListingXml: Future[String] = s3ListBucket
>   .map(_.utf8String)
>   .runWith(Sink.seq)(materializer)
>   .map(_.mkString)(materializer.executionContext)
>
>
>   bucketListingXml.foreach {
> xml =>
>
>   println(s"This gets called. prefix $currentPrefix")
>
> pushCallback.invoke(xml)
> }(materializer.executionContext)
>
>
> And the callback
>
>
> val pushCallback = getAsyncCallback[String] { xml =>
>   log.info(s"This is never called for last element in graph!")
>   push(out, xml)
> }
>
>
> I don't see any errors and this issue consistently occurs on the last 
> element. Thanks
>
> Andrew
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Chunked Response handling with Akka Http?

2017-01-16 Thread subopt1


I'd like to be able to use akka-http with chunked responses. I'd like to know 
if I can do the same thing in akka-http as with the playframework. The 
following is from playframework documentation.

def index = Action {
  val source = Source.apply(List("kiki", "foo", "bar"))
  Ok.chunked(source)
}

We can inspect the HTTP response sent by the server:

HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Transfer-Encoding: chunked

4
kiki
3
foo
3
bar
0

With akka-http I was able to get close with

 val hi = Chunk("Hi" + "\n")

 val there = Chunk("there" + "\n")

 val last = LastChunk("boom" + "\n")

 val source: Source[ChunkStreamPart, NotUsed] = 
Source(scala.collection.immutable.Iterable(hi, there, last))

 val rootPath: Route =

   path("") {

 get {

   for (i <- 1 to 100) {

 ref ! hi

   }

   complete(Chunked(ContentTypes.`text/plain(UTF-8)`, source))

 }

   }

 def routes: Route = rootPath

}

curl returns

< HTTP/1.1 200 OK

< Server: akka

< Date: Mon, 16 Jan 2017 14:03:10 GMT

< Transfer-Encoding: chunked

< Content-Type: text/plain; charset=UTF-8

< 

Hi

there



Two perhaps minor issues: it does not report the last element, and it doesn't 
include the chunk size as the playframework example did.

What I’d like to know is how can I do this in a dynamic way? I'm interacting 
with a embedded device (via serial) where I make an async request and it 
responds one or more times up to about 20 seconds. I'd like each response to be 
emitted as a chunk. Is this possible? 

Thanks,
Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Not exactly sure what you were looking for. I'm relatively new to akka 
streams. This is the minimal amount of code that reproduces the problem, 
involving:

a custom source
an flow with AsyncCallback
and parallelizing

only deps are akka and joda time

On Friday, December 16, 2016 at 3:31:37 PM UTC-7, √ wrote:
>
> Is this really a *minimized* reproducer?
>
> On Fri, Dec 16, 2016 at 11:09 PM, > wrote:
>
>> Here's the code to reproduce. The issue only seems to occur with my 
>> custom Source and using a callback in a Flow and while parallelizing. If 
>> this code is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it 
>> drops the last minute, but when run with .via(asyncFlow) it does not.
>>
>> So with parallelize it prints all but the last minute:
>>
>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>> 2016/12/14/23/57, 2016/12/14/23/58)
>>
>> and without it prints the expected value, with all minutes flowing 
>> through the graph:
>>
>> seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
>> 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
>> 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)
>>
>>
>>
>>
>>
>> package hack.streams
>>
>> import akka.NotUsed
>> import akka.stream._
>> import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
>> import akka.stream.stage._
>> import org.joda.time.{DateTime, Duration, Interval}
>>
>> import scala.collection.immutable.Seq
>> import scala.concurrent.{Await, Future}
>>
>> object AsyncIssue {
>>   import StreamsMaterializer._
>>
>>   def minuteSource(interval: Interval) = new 
>> GraphStage[SourceShape[DateTime]]() {
>> val out = Outlet[DateTime]("keys")
>> val shape = SourceShape(out)
>>
>> def zeroToMinute(date: DateTime) = 
>> date.withMillisOfSecond(0).withSecondOfMinute(0)
>>
>> override def createLogic(inheritedAttributes: Attributes): 
>> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>>   var isDone: Boolean = false
>>   var current: DateTime = new DateTime(0)
>>
>>   override def preStart() = {
>> current = zeroToMinute(interval.getStart)
>>   }
>>
>>   setHandler(out,
>> new OutHandler {
>>   override def onPull(): Unit = {
>> if (!isDone) {
>>   push(out, current)
>>   current = current.plusMinutes(1)
>>
>>   if (current.isEqual(zeroToMinute(interval.getEnd))) {
>> isDone = true
>>   }
>> } else {
>>   complete(out)
>> }
>>   }
>> })
>> }
>>   }
>>
>>   def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
>> val in = Inlet[DateTime]("minute")
>> val out = Outlet[String]("string")
>>
>> val formatter = 
>> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>>
>> val shape = FlowShape.of(in, out)
>>
>> override def createLogic(inheritedAttributes: Attributes): 
>> GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
>>   val pushCallback = getAsyncCallback[String] { seq =>
>> push(out, seq)
>>   }
>>
>>   setHandler(in, new InHandler {
>> override def onPush(): Unit = {
>>   val minute = grab(in)
>>
>>   val fMin: Future[DateTime] = Future {minute}
>>
>>   fMin.foreach { min =>
>> pushCallback.invoke(formatter.print(min))
>>   }
>> }
>>   })
>>
>>   setHandler(out,
>> new OutHandler {
>>   override def onPull(): Unit = {
>> pull(in)
>>   }
>> }
>>   )
>> }
>>   }
>>
>>   def parallelizeFlow[In, Out](parallelize: Int, flow: 
>> Flow[In,Out,NotUsed]): Flow[In, Out, NotUsed] = 
>> Flow.fromGraph(GraphDSL.create() { implicit builder =>
>> import GraphDSL.Implicits._
>>
>> val dispatcher = builder.add(Balance[In](parallelize))
>> val merger = builder.add(Merge[Out](parallelize))
>>
>> for (i <- 0 to parallelize - 1) {
>>   dispatcher.out(i) ~> flow.async ~> merger.in(i)
>> }
>>
>> FlowShape(dispatcher.in, merger.out)
>>   })
>>
>>   def run() = {
>> import StreamsMaterializer._
>>
>> val asyncFlow = Flow[DateTime].via(futureCallbackFlow)
>>
>> val source: Source[DateTime, NotUsed] = Source(11 to 
>> 20).via(Flow[Int].map { num =>
>>   val formatter = 
>> org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
>>   formatter.parseDateTime(s"2016/12/14/14/$num")
>> })
>>
>> val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new 
>> Interval(Duration.standardMinutes(10), new 
>> DateTime().dayOfMonth().roundFloorCopy().minusDays(1
>>   .via(parallelizeFlow(parallelize = 1, asyncFlow))
>> //  .via(asyncFlow)
>>   .runWith(Sink.seq)
>>
>> val seq: Seq[String] = Await.result(mat, 
>> scala.conc

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Here's the code to reproduce. The issue only seems to occur with my custom 
Source and using a callback in a Flow and while parallelizing. If this code 
is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops the 
last minute, but when run with .via(asyncFlow) it does not.

So with parallelize it prints all but the last minute:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
2016/12/14/23/57, 2016/12/14/23/58)

and without it prints the expected value, with all minutes flowing through 
the graph:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 
2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 
2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)





package hack.streams

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.stage._
import org.joda.time.{DateTime, Duration, Interval}

import scala.collection.immutable.Seq
import scala.concurrent.{Await, Future}

object AsyncIssue {
  import StreamsMaterializer._

  def minuteSource(interval: Interval) = new 
GraphStage[SourceShape[DateTime]]() {
val out = Outlet[DateTime]("keys")
val shape = SourceShape(out)

def zeroToMinute(date: DateTime) = 
date.withMillisOfSecond(0).withSecondOfMinute(0)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new GraphStageLogic(shape) with StageLogging {
  var isDone: Boolean = false
  var current: DateTime = new DateTime(0)

  override def preStart() = {
current = zeroToMinute(interval.getStart)
  }

  setHandler(out,
new OutHandler {
  override def onPull(): Unit = {
if (!isDone) {
  push(out, current)
  current = current.plusMinutes(1)

  if (current.isEqual(zeroToMinute(interval.getEnd))) {
isDone = true
  }
} else {
  complete(out)
}
  }
})
}
  }

  def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
val in = Inlet[DateTime]("minute")
val out = Outlet[String]("string")

val formatter = 
org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")

val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new GraphStageLogic(shape) with StageLogging {
  val pushCallback = getAsyncCallback[String] { seq =>
push(out, seq)
  }

  setHandler(in, new InHandler {
override def onPush(): Unit = {
  val minute = grab(in)

  val fMin: Future[DateTime] = Future {minute}

  fMin.foreach { min =>
pushCallback.invoke(formatter.print(min))
  }
}
  })

  setHandler(out,
new OutHandler {
  override def onPull(): Unit = {
pull(in)
  }
}
  )
}
  }

  def parallelizeFlow[In, Out](parallelize: Int, flow: Flow[In,Out,NotUsed]): 
Flow[In, Out, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val dispatcher = builder.add(Balance[In](parallelize))
val merger = builder.add(Merge[Out](parallelize))

for (i <- 0 to parallelize - 1) {
  dispatcher.out(i) ~> flow.async ~> merger.in(i)
}

FlowShape(dispatcher.in, merger.out)
  })

  def run() = {
import StreamsMaterializer._

val asyncFlow = Flow[DateTime].via(futureCallbackFlow)

val source: Source[DateTime, NotUsed] = Source(11 to 20).via(Flow[Int].map 
{ num =>
  val formatter = 
org.joda.time.format.DateTimeFormat.forPattern("/MM/dd/HH/mm")
  formatter.parseDateTime(s"2016/12/14/14/$num")
})

val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new 
Interval(Duration.standardMinutes(10), new 
DateTime().dayOfMonth().roundFloorCopy().minusDays(1
  .via(parallelizeFlow(parallelize = 1, asyncFlow))
//  .via(asyncFlow)
  .runWith(Sink.seq)

val seq: Seq[String] = Await.result(mat, 
scala.concurrent.duration.Duration.Inf)
println(s"seq is $seq")
  }
}




On Friday, December 16, 2016 at 8:03:45 AM UTC-7, √ wrote:
>
> Plase submit a miminized reproducer so readers have a chance of running 
> the code.
>
> On Fri, Dec 16, 2016 at 3:44 PM, > wrote:
>
>> Hi,
>>
>> I'm seeing an issue where the graph completes while there is still data 
>> in one of the flows. The last element emitted by the source enters a custom 
>> GraphStageLogic flow, where it is sent to a function that returns a Future. 
>> That Future has a callback which invokes getAsyncCallback and then 
>> push(out). For the last element, the Future callback fires (pushCallback
>> .invoke(xml)) but the AsyncCallback is never invoked and the graph stops.
>>
>> For more context, this is what I have going on inside the GraphSt

[akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Hi,

I'm seeing an issue where the graph completes while there is still data in 
one of the flows. The last element emitted by the source enters a custom 
GraphStageLogic flow, where it is sent to a function that returns a Future. 
That Future has a callback which invokes getAsyncCallback and then 
push(out). For the last element, the Future callback fires (pushCallback
.invoke(xml)) but the AsyncCallback is never invoked and the graph stops.

For more context, this is what I have going on inside the GraphStageLogic:

val s3ListBucket: Source[ByteString, NotUsed] =
  s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)

val bucketListingXml: Future[String] = s3ListBucket
  .map(_.utf8String)
  .runWith(Sink.seq)(materializer)
  .map(_.mkString)(materializer.executionContext)


  bucketListingXml.foreach {
xml =>

  println(s"This gets called. prefix $currentPrefix")

pushCallback.invoke(xml)
}(materializer.executionContext)


And the callback


val pushCallback = getAsyncCallback[String] { xml =>
  log.info(s"This is never called for last element in graph!")
  push(out, xml)
}


I don't see any errors and this issue consistently occurs on the last 
element. Thanks

Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Optimizing akka streams with parallelize

2016-12-15 Thread subopt1
Hi,

I've been looking for some guidance on optimizing akka streams for 
throughput but have not found much info so far. For example, if I have a 
non-blocking flow, would parallelizing by number of cpu cores make sense? 
So far what I've observed after several runs is a decrease in processing 
time as I increase the parallelize until I hit number of cores, and then it 
doesn't seem get any faster. To clarify, when I say parallelize I'm 
referring to doing the following n-times:

dispatcher.out(i) ~> flow.async ~> merger.in(i)


Any ideas? Thanks!

Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Varying result when running akka flow in parellel

2016-12-14 Thread subopt1
HI,

I'm working with a flow that downloads data, parses json and adds ids to a 
set (dedupe). It's working just fine however when I modify the flow to run 
in parallel, I get different results.

Here's my graph:

val graph: RunnableGraph[Future[HashSet[Long]]] =
  Source.fromGraph(new MinuteSource(firstMinuteYesterday, 
firstMinuteYesterday.plusDays(1)))
  .via(dsl(parallelize = 4))
  .toMat(Sink.fold(new HashSet[Long]())((accSet, set) => {
accSet ++ set
  }))(Keep.right)


val deduped: Set[Long] = Await.result(graph.run(), Duration.Inf)

println(s"seq size is ${deduped.size} in ${new Duration(start, new 
DateTime()).toString}")


The dsl looks like

def dsl(parallelize: Int) = Flow.fromGraph(GraphDSL.create() { implicit builder 
=>
  import GraphDSL.Implicits._

  val dispatcher = builder.add(Balance[DateTime](parallelize))
  val merger = builder.add(Merge[Set[Long]](parallelize))

  for (i <- 0 to parallelize - 1) {
dispatcher.out(i) ~> consumptionFlow.async ~> merger.in(i)
  }

  FlowShape(dispatcher.in, merger.out)
})


Here are the results for different parallelize values:


// parallelize 1 -> seq size is 48560 in 175
// parallelize 2 -> seq size is 48531 in 117
// parallelize 4 -> seq size is 48481 in 107


The resulting set size varies based on the parallelize number. What's 
interesting is the set size values are consistent, across runs. Does this 
make sense to anyone? Thanks!

Andrew

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to deal with Futures in Sources/Flows

2016-12-09 Thread subopt1
I thought I needed to use a GraphStageLogic because I need to do the 
following:

request url for page of data and push
if response has next token, grab another page and push
continue until no next token
complete the stream

I wasn't sure how to accomplish that logic in a Source.

I figured out a solution by using getAsyncCallback

val pushCallback = getAsyncCallback[Seq[String]] { seq =>
  push(out, seq)
}


Then on my future callback I call pushCallback.invoke(seq)


It seems to work just fine but not sure that materializing a source in a 
GraphStageLogic is the best approach.


If you have some example you can point me at I'm all ears!


On Friday, December 9, 2016 at 1:27:31 AM UTC-7, drewhk wrote:
>
>
>
> On Thu, Dec 8, 2016 at 9:21 PM, > wrote:
>
>> Hi,
>>
>> I'm creating a Source via GraphStageLogic which makes calls to another 
>> api, which happens to return a Source. However I'm unsure how to deal with 
>> Source/Futures in a GraphStageLogic. It seems that I want my shape to look 
>> like
>>
>> val shape: SourceShape[Seq[String]] = SourceShape(out)
>>
>>
>> but I get back a Source[ByteString, NotUsed] which I can covert to 
>> Future[Seq[String]] 
>> via runWith(Sink.seq). So should I make my shape
>>
>> val shape: SourceShape[Future[Seq[String]]] = SourceShape(out)
>>
>>
>> But then I need to materialize the Source inside the onPull(), which 
>> doesn't seem right. Is there a better way to handle this situation?
>>
>  
>  You likely need to combine your stage with a flatMapConcat(). Remember 
> that you don't need to implement all your logic inside a GraphStage, you 
> can create Sources just by using existing combinators like mapAsync or 
> flatMapConcat. In fact, if you need some custom operation that is not 
> covered by the built-in combinators, it is usually a good idea to minimize 
> the custom part and combine it with the existing combinators to achieve 
> what you want.
>  
>
>>
>> Thanks!
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: How to deal with Futures in Sources/Flows

2016-12-08 Thread subopt1
Well that didn't work at all. onPull gets called continuously before any 
Futures have been resolved. At this point the only way I know to make it 
work is to Await the future however then I would be blocking the stream. 
Any ideas?

On Thursday, December 8, 2016 at 1:21:08 PM UTC-7, sub...@gmail.com wrote:
>
> Hi,
>
> I'm creating a Source via GraphStageLogic which makes calls to another 
> api, which happens to return a Source. However I'm unsure how to deal with 
> Source/Futures in a GraphStageLogic. It seems that I want my shape to look 
> like
>
> val shape: SourceShape[Seq[String]] = SourceShape(out)
>
>
> but I get back a Source[ByteString, NotUsed] which I can covert to 
> Future[Seq[String]] 
> via runWith(Sink.seq). So should I make my shape
>
> val shape: SourceShape[Future[Seq[String]]] = SourceShape(out)
>
>
> But then I need to materialize the Source inside the onPull(), which 
> doesn't seem right. Is there a better way to handle this situation?
>
> Thanks!
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to deal with Futures in Sources/Flows

2016-12-08 Thread subopt1
Hi,

I'm creating a Source via GraphStageLogic which makes calls to another api, 
which happens to return a Source. However I'm unsure how to deal with 
Source/Futures in a GraphStageLogic. It seems that I want my shape to look 
like

val shape: SourceShape[Seq[String]] = SourceShape(out)


but I get back a Source[ByteString, NotUsed] which I can covert to 
Future[Seq[String]] 
via runWith(Sink.seq). So should I make my shape

val shape: SourceShape[Future[Seq[String]]] = SourceShape(out)


But then I need to materialize the Source inside the onPull(), which 
doesn't seem right. Is there a better way to handle this situation?

Thanks!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I figured it out. Submitting a pull request!

On Tuesday, December 6, 2016 at 12:29:28 PM UTC-7, Konrad Malawski wrote:
>
> Is the JSON well formed and "normal" or something weird or maybe huge 
> objects or something in there etc?
> Try to debug at which point it gets stuck.
>
> A minimized reproducer would help.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 6 December 2016 at 20:26:43, sub...@gmail.com  (
> sub...@gmail.com ) wrote:
>
> I working on a Akka Streams project that reads gzipped files from S3 and 
> parses json. The issue I'm running into the stream stalls at about 24523530 
> bytes and then times-out after a 1 minute 
> (java.util.concurrent.TimeoutException: No elements passed in the last 1 
> minute), but there is no error otherwise. If I remove the JsonFraming.
> objectScanner from the flow, it does not exhibit this behavior. The 
> stream looks like:
>
> val s3FileSource: Source[ByteString, NotUsed] = 
> s3FileNamesSource.via(Flows.downloadObjectsFlow(s3Client, bucket))
>
>
>   val result = s3FileSource
> .via(Compression.gunzip(100))
> .via(JsonFraming.objectScanner(Int.MaxValue))
> .runWith(FileIO.toPath(Paths.get("stream.out"), Set(CREATE, WRITE)))
>
>
> The json that is does write to the file looks just fine. I'm on Akka 2.4.14. 
> Any ideas on what might be going on or how to troubleshoot? Thanks,
>
>
> Andrew
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I working on a Akka Streams project that reads gzipped files from S3 and 
parses json. The issue I'm running into the stream stalls at about 24523530 
bytes and then times-out after a 1 minute 
(java.util.concurrent.TimeoutException: No elements passed in the last 1 
minute), but there is no error otherwise. If I remove the JsonFraming.
objectScanner from the flow, it does not exhibit this behavior. The stream 
looks like:

val s3FileSource: Source[ByteString, NotUsed] = 
s3FileNamesSource.via(Flows.downloadObjectsFlow(s3Client, bucket))


  val result = s3FileSource
.via(Compression.gunzip(100))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runWith(FileIO.toPath(Paths.get("stream.out"), Set(CREATE, WRITE)))


The json that is does write to the file looks just fine. I'm on Akka 2.4.14. 
Any ideas on what might be going on or how to troubleshoot? Thanks,


Andrew


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: http proxy support

2016-11-02 Thread subopt1
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/scala/http/configuration.html

also in the 2.4.11 docs but can't find that link at the moment

prior to the client section, under host-connection-pool it says


   1. # Please note that this section mirrors `akka.http.client` however is 
   used only for pool-based APIs,
   2. # such as `Http().superPool` or `Http().singleRequest`.

If that feature is not yet in Akka, is there any solution to get proxy 
support for use with Akka Streams? For example could I use a Netty or other 
http implementation and create a Source from that? Thanks!


On Wednesday, November 2, 2016 at 3:06:10 AM UTC-6, André wrote:
>
> Hi,
>
> this config was supported in Spray but wasn't yet ported over to akka-http.
>
> > Based on what I've read so far I need to enable the configuration and 
> then it should work with Http().singleRequest
> > however it's not connecting through the proxy.
>
> May I ask where you got that info from?
>
> Cheers
> André
>
> On Monday, October 31, 2016 at 11:26:44 PM UTC+1, sub...@gmail.com wrote:
>>
>> I'm interested in proxying http requests with akka-http. Based on what 
>> I've read so far I need to enable the configuration and then it should work 
>> with Http().singleRequest
>> however it's not connecting through the proxy. I have the following 
>> config in application.conf
>>
>> akka.http {
>>
>> host-connection-pool {
>>
>>   client {
>>
>> proxy {
>>
>>   http {
>>
>> host = 127.0.0.1
>>
>> post = 
>>
>>   }
>>
>> }
>>
>>   }
>>
>> }
>> }
>>
>> and following code
>>
>> val responseFuture: Future[HttpResponse] =
>>   Http().singleRequest(HttpRequest(uri = "http://akka.io";))
>>
>>
>> val response: HttpResponse = Await.result(responseFuture, Duration.Inf)
>>
>> println(response.status.intValue())
>>
>>
>> SBT has
>>
>>
>> libraryDependencies += "com.typesafe.akka" %% "akka-http-core" % "2.4.11"
>> libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % 
>> "2.4.11"
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] http proxy support

2016-10-31 Thread subopt1
I'm interested in proxying http requests with akka-http. Based on what I've 
read so far I need to enable the configuration and then it should work with 
Http().singleRequest
however it's not connecting through the proxy. I have the following config 
in application.conf

akka.http {

host-connection-pool {

  client {

proxy {

  http {

host = 127.0.0.1

post = 

  }

}

  }

}
}

and following code

val responseFuture: Future[HttpResponse] =
  Http().singleRequest(HttpRequest(uri = "http://akka.io";))


val response: HttpResponse = Await.result(responseFuture, Duration.Inf)

println(response.status.intValue())


SBT has


libraryDependencies += "com.typesafe.akka" %% "akka-http-core" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % 
"2.4.11"

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.