Re: [akka-user] akka stream design for processing CPU+RAM intensive operations

2017-09-19 Thread Jakub Liska
On Tuesday, September 19, 2017 at 9:53:36 PM UTC+2, Patrik Nordwall wrote:
>
> You can try and compare Balance, Partition and PartitionHub.
>

Is the fundamental difference between GroupBy and Partition/PartitionHub 
the fact that the latter gives you more fine-grain control over the 
internal functioning? 

Otherwise it should be similar regarding parallelism and CPU utilization , 
right? 

So far I benchmarked best results with : 
.withAttributes(
  Attributes.asyncBoundary
.and(Attributes.inputBuffer(16,128))
.and(ActorAttributes.dispatcher(
"akka.stream.log-processing-dispatcher"))
)

  stream {
log-processing-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
parallelism-min = 6
parallelism-factor = 1.0
parallelism-max = 8
  }
  throughput = 10
}
  }


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka stream design for processing CPU+RAM intensive operations

2017-09-18 Thread Jakub Liska
Hey,

having an operation that heavily utilizes CPU and RAM, nothing else (no 
IO), but it is a main bottleneck, then it never pays off using 
`stream.mapAsync` for it unless you batch operations by something like 10 
000, so I started using stuff like :

.map(operation).withAttributes(
Attributes.asyncBoundary 
  .and(Attributes.inputBuffer(16,32))
  .and(ActorAttributes.dispatcher("pinned-dispatcher"))
)

that is usually at least 3x more effective than `mapAsyncUnordered(4)`, but 
it still utilizes only one CPU core.

Can you think of anything better than this or how to improve the above ^^ ? 

*One way would be probably to split the flow into 4 substreams which would 
lead to utilizing 4 cores, right ? So is akka.stream.scaladsl.Balance the 
way to tackle this challange? *

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Speeding up actor system boot time in testing environment

2017-08-27 Thread Jakub Liska

>
> Why do you need to start new systems?
>

For executing tasks in forked JVM. In order to establish communication 
between 2 processes, it needs TCP as retrieving results from a different 
process is hard without it.

Those tasks are rather long running, so 800ms boot time doesn't matter at 
all. However it matters a lot when testing these systems...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Speeding up actor system boot time in testing environment

2017-08-27 Thread Jakub Liska
Hey,

actor system boot time matters in distributed systems integration testing, 
otherwise spinning system 100 times in a test suite could 
take minutes. I currently cannot use multi JVM testing, so I'm wondering 
whether there is a way how to improve system boot time.

Currently forking a JVM with an actor system takes no less than 800ms from 
my experience.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] how to avoid or catch an akka.remote.transport.Transport$InvalidAssociationException

2017-08-24 Thread Jakub Liska
Hey, I think that the : 

ERROR: akka.remote.transport.Transport$InvalidAssociationException: The 
remote system terminated the association because it is shutting down.

should not exist in case of graceful remote system shutdown. Imagine that 
you have a system that is forking JVMs with remote actor system
for each task or job (the remote actor system serves only as a way of 
passing input and result) and you're getting thousands of these ERRORs,
then somebody uses your software and he gets thousands of these errors.

Btw this is nice David, unfortunately it doesn't solve the ERROR logging 
problem.

  override def preStart(): Unit = context.system.eventStream.subscribe(self, 
classOf[DisassociatedEvent])

  override def unhandled(message: Any): Unit = message match {
case DisassociatedEvent(local, remote, _) =>
  log.info(s"Forked executor system $remote disassociated from $local 
...")
case x => super.unhandled(x)
  }

Anybody has an idea how to avoid these ERROR log messages ?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] groupBy limits

2017-06-30 Thread Jakub Liska
100 000 seems to be the maximum, beyond that, no matter how much memory and 
processor power it has, it blows up on  :

Substream Source has not been materialized in 5000 milliseconds.

Which is a consequence of GC choking I guess

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] groupBy limits

2017-06-30 Thread Jakub Liska
Have you guys done some benchmark of the groupBy capacity, ie. what count 
of distinct elements and therefore substreams can be handled with what 
resources?

Or more general version of the benchmark, ie. benchmark showing performance 
of number of substreams running in parallel ?

If not, do you have a rough guess? Is 100 000 a reasonable number for 
m4.large instance? 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [Akka-stream] How to log element pass into a Graph ?

2017-02-21 Thread Jakub Liska
Ah, silly me, that's it Konrad, thank you !

On Tue, Feb 21, 2017 at 1:32 PM, Konrad Malawski <
konrad.malaw...@lightbend.com> wrote:

> What's your akka.actor.loglevel ?
>
> --
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 21 February 2017 at 22:12:16, Jakub Liska (liska.ja...@gmail.com)
> wrote:
>
> The code underneath is :
>
> https://github.com/akka/akka/blob/master/akka-stream/src/
> main/scala/akka/stream/impl/fusing/Ops.scala#L1306
>
> I use : debugLogging = true
>
> "org.slf4j" %  "slf4j-api"
> % "1.7.22",
> "org.slf4j" %  "slf4j-log4j12"
>  % "1.7.22"
> "com.typesafe.scala-logging"%% "scala-logging"% "3.5.0"
>
> and I have debug levels in log4j.properties :
> log4j.rootLogger=DEBUG, stdout
> log4j.logger.akka.stream=DEBUG, stdout
>
> ActorLogging and LazyLogging log properly in my entire application, but I
> cannot make akka-stream log anything 
>
>
>
> On Tue, Feb 21, 2017 at 1:01 PM, Konrad Malawski <
> konrad.malaw...@lightbend.com> wrote:
>
>> Why wouldn't stage.log work Jason?
>> It uses the exact same infra as actor logging.
>>
>> What do you mean by not working? Were you using the slf4j adapter
>> (dependency + settings)?
>>
>> --
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 21 February 2017 at 21:43:00, Jakub Liska (liska.ja...@gmail.com)
>> wrote:
>>
>> s.log("") has actually never worked for me in past years. I'm using
>> streams mostly outside Actors and I have my slf4j + impl on classpath and
>> setup correctly
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [Akka-stream] How to log element pass into a Graph ?

2017-02-21 Thread Jakub Liska
The code underneath is :

https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala#L1306

I use : debugLogging = true

"org.slf4j" %  "slf4j-api"
% "1.7.22",
"org.slf4j" %  "slf4j-log4j12"
 % "1.7.22"
"com.typesafe.scala-logging"%% "scala-logging"% "3.5.0"

and I have debug levels in log4j.properties :
log4j.rootLogger=DEBUG, stdout
log4j.logger.akka.stream=DEBUG, stdout

ActorLogging and LazyLogging log properly in my entire application, but I
cannot make akka-stream log anything 



On Tue, Feb 21, 2017 at 1:01 PM, Konrad Malawski <
konrad.malaw...@lightbend.com> wrote:

> Why wouldn't stage.log work Jason?
> It uses the exact same infra as actor logging.
>
> What do you mean by not working? Were you using the slf4j adapter
> (dependency + settings)?
>
> --
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 21 February 2017 at 21:43:00, Jakub Liska (liska.ja...@gmail.com)
> wrote:
>
> s.log("") has actually never worked for me in past years. I'm using
> streams mostly outside Actors and I have my slf4j + impl on classpath and
> setup correctly
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [Akka-stream] How to log element pass into a Graph ?

2017-02-21 Thread Jakub Liska
s.log("") has actually never worked for me in past years. I'm using streams 
mostly outside Actors and I have my slf4j + impl on classpath and setup 
correctly  

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Implementation of Persistent Actor with initial state

2016-10-15 Thread Jakub Liska
Yes, that's what I meant. 

Perfect, it would work in Persistent Actor... What about PersistentFSM ? 
I guess that I will have to introduce an initial FSMState where I would 
check whether the persistent state is empty or it was recovered and act on 
that.

Cool, thanks a lot Patrik !



On Saturday, October 15, 2016 at 8:14:22 AM UTC+2, Patrik Nordwall wrote:
>
> I'm not sure I understand. You want to read the old data if the persistent 
> actor doesn't have any events of its own?
>
> You can check the state when you get RecoveryCompleted. You can also count 
> the events/snapshot in receiveRecover and act on that when 
> RecoveryCompleted.
>
> /Patrik
>
> fre 14 okt. 2016 kl. 22:11 skrev Jakub Liska  >:
>
>> Hey,
>>
>> what is the best practice to do in this hypothetical scenario : 
>> 1) Say you have a time series pipeline that started at 2014 and created 
>> persistent state on S3 and other DB systems
>> 2) You can introspect these storages and know what partitions already 
>> exists in all of them
>> 3) The persistent actor's job would be scheduling tasks for newly added 
>> partitions (next minute/hour/day, etc.)
>> 3) Now at 2016 you deploy a persistent Actor that will hold state about 
>> completness (existing partitions) of these storages and keep up with their 
>> progress
>>
>> The way I see it, when this persistent Actor boots up, it will have 2 
>> choices :
>> a) either it starts for the very first time, it will have to replay all 
>> DomainEvents for all historical partitions from 2014 to itself to 
>> initialize it's state to the current view of the world in 2016 
>> b) or it restarts or crashes and its state is replayed from the journal 
>> implicitly
>>
>> Now I cannot find any reference of how this should be done. The only 
>> solution that comes to mind is to use persistence-query and obtaining 
>> Journal : 
>>
>>readJournal.currentPersistenceIds()
>>readJournal.eventsByPersistenceId("user-us-1337")
>>1. 
>>And if the result is empty, then it will reconstructs the history.
>>
>> Is this a way to go?
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Implementation of Persistent Actor with initial state

2016-10-14 Thread Jakub Liska
Hey,

what is the best practice to do in this hypothetical scenario : 
1) Say you have a time series pipeline that started at 2014 and created 
persistent state on S3 and other DB systems
2) You can introspect these storages and know what partitions already 
exists in all of them
3) The persistent actor's job would be scheduling tasks for newly added 
partitions (next minute/hour/day, etc.)
3) Now at 2016 you deploy a persistent Actor that will hold state about 
completness (existing partitions) of these storages and keep up with their 
progress

The way I see it, when this persistent Actor boots up, it will have 2 
choices :
a) either it starts for the very first time, it will have to replay all 
DomainEvents for all historical partitions from 2014 to itself to 
initialize it's state to the current view of the world in 2016 
b) or it restarts or crashes and its state is replayed from the journal 
implicitly

Now I cannot find any reference of how this should be done. The only 
solution that comes to mind is to use persistence-query and obtaining 
Journal : 

   readJournal.currentPersistenceIds()
   readJournal.eventsByPersistenceId("user-us-1337")
   1. 
   And if the result is empty, then it will reconstructs the history.

Is this a way to go?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] How to implement Spark's cogroup/join operation

2016-09-29 Thread Jakub Liska
I'm trying to figure out why this is hanging/idling indefinitely : 

Source.fromIterator(() => Iterator.from(0).take(500).map(_ -> 1))
  .groupBy(Int.MaxValue, _._1)
  .mergeSubstreamsWithParallelism(256)
  .runWith(Sink.seq)

This is the only way how to avoid instantiating ridiculous amounts of sub 
streams.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] How to implement Spark's cogroup/join operation

2016-09-27 Thread Jakub Liska
Hey,

I'm reimplementing a few Spark batch jobs as akka streams. 

I got stuck at the last one that takes two PairRdd[Key,Value] and cogroups 
them by Key
which returns an Rdd[Key,Seq[Value]] and then it processes Seq[Value] for 
each of the unique Keys that are present in both original PairRdds, which 
is kind of "batchy" operation. Moreover there is high Key cardinality, like 
50% of keys are unique.

So if I merged those two Sources and used groupBy then it would create as 
many SubFlows as number of distinct Keys, which could be max 5 millions.

So my questions are :

1) Is there another way to do that? Note that I cannot use reduce like ops, 
I need the Seq[Value] physically present when the stream ends.
2) If not, is it Ok to have like 5M tiny SubFlows? 
3) what should the parallelism be for this kind of groupBy operation in 
mergeSubstreamsWithParallelism? 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to use WebSockets with ActorPublisher?

2016-07-20 Thread Jakub Liska
There seems to be a third alternative to ActorPublisher :

 Source.actorRef[User.OutgoingMessage](10, OverflowStrategy.fail).
mapMaterializedValue

the actorRef can be passed to a different Actor that would feed it messages 
and it would all behave like ActorPublisher, right?

On Wednesday, July 20, 2016 at 4:21:35 PM UTC+2, Konrad Malawski wrote:
>
> Glad you resolved it.
> Related hint is that one really really shouldn't be using ActorPublisher, 
> you should build stages from GraphStage instead,
> the reason is that a) ActorPublisher is not fusable b) it's really hard to 
> actually implement a *correct* Publisher (even with ActorPublisher's help).
>
> Happy hakking.,
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 20 July 2016 at 16:19:58, Jakub Liska (liska...@gmail.com ) 
> wrote:
>
> RESOLVED : 
>
> I found a way to create Source and consequently ActorPublisher from 
> ActorRef, so that I can obtain the ActorRef upfront, so instead of :
> 
> val stateChangeSource: Source[PipelineState, ActorRef] = Source.
> actorPublisher[PipelineState](StateChangePublisher.props)
>
> I do :
>
> val stateChangePublisher: Publisher[PipelineState] = ActorPublisher[
> PipelineState](stateChangePublisherRef)
> val stateChangeSource = Source.fromPublisher(stateChangePublisher)
>
> And I can send messages to stateChangePublisherRef and push them to 
> Browser... 
>
> On Wednesday, July 20, 2016 at 4:07:13 PM UTC+2, Konrad Malawski wrote: 
>>
>> Could you provide a sample snippet that we could help out with?
>> Context helps to get quicker help.
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 20 July 2016 at 16:03:30, Jakub Liska (liska...@gmail.com) wrote:
>>
>> hey,  
>>
>> I hit a deadend with combination of Websockets and ActorPublisher because 
>> the TextMessage expects Source and one can obtain the underlying ActorRef 
>> from ActorPublisher only by materializing it :
>>  
>>
>> https://github.com/akka/akka/blob/29029be31d9198ed45c73efbc2d0212651882a94/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala#L30
>>
>> Theoretically one could look it up but then there is a question "when" it 
>> becomes materialized :-/
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to use WebSockets with ActorPublisher?

2016-07-20 Thread Jakub Liska
RESOLVED : 

I found a way to create Source and consequently ActorPublisher from 
ActorRef, so that I can obtain the ActorRef upfront, so instead of :
   
val stateChangeSource: Source[PipelineState, ActorRef] = Source.
actorPublisher[PipelineState](StateChangePublisher.props)

I do :

val stateChangePublisher: Publisher[PipelineState] = ActorPublisher[
PipelineState](stateChangePublisherRef)
val stateChangeSource = Source.fromPublisher(stateChangePublisher)

And I can send messages to stateChangePublisherRef and push them to 
Browser... 

On Wednesday, July 20, 2016 at 4:07:13 PM UTC+2, Konrad Malawski wrote:
>
> Could you provide a sample snippet that we could help out with?
> Context helps to get quicker help.
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 20 July 2016 at 16:03:30, Jakub Liska (liska...@gmail.com ) 
> wrote:
>
> hey,  
>
> I hit a deadend with combination of Websockets and ActorPublisher because 
> the TextMessage expects Source and one can obtain the underlying ActorRef 
> from ActorPublisher only by materializing it :
>  
>
> https://github.com/akka/akka/blob/29029be31d9198ed45c73efbc2d0212651882a94/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala#L30
>
> Theoretically one could look it up but then there is a question "when" it 
> becomes materialized :-/
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to use WebSockets with ActorPublisher?

2016-07-20 Thread Jakub Liska
hey, 

I hit a deadend with combination of Websockets and ActorPublisher because 
the TextMessage expects Source and one can obtain the underlying ActorRef 
from ActorPublisher only by materializing it :
 
https://github.com/akka/akka/blob/29029be31d9198ed45c73efbc2d0212651882a94/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala#L30

Theoretically one could look it up but then there is a question "when" it 
becomes materialized :-/

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to replace FlexiMerge and ReadAny in 2.0 GraphStage ?

2016-01-16 Thread Jakub Liska
There is a culprit though,

if the bypassed (ignored) Input has many elements and the consumed Input is 
empty, than the GraphStage pulls just one Bypassed element... In other 
words it completes too soon...

I think it is caused by  the "doFinish = true" condition in passAlong :

passAlong(in1, out, doFinish = true, doFail = true)


On Saturday, January 16, 2016 at 7:55:27 PM UTC+1, Jakub Liska wrote:
>
> You are right,
>
> this is the correct version : 
> https://gist.github.com/l15k4/6d01261b5e579a02f4fd#gistcomment-1671753
>
> Thanks a lot Ronald !
>
>
> On Saturday, January 16, 2016 at 7:35:33 PM UTC+1, rkuhn wrote:
>>
>> Hi Jakub,
>>
>> it is not buggy, it is tailored to the HTTP use-case ;-) I meant the link 
>> merely as an inspiration showing you how to use passAlong() and 
>> eagerTerminateInput / ignoreTerminateInput; I think your use-case calls for 
>> the latter.
>>
>> Regards,
>>
>> Roland
>>
>> 16 jan 2016 kl. 19:32 skrev Jakub Liska :
>>
>> Hey,
>>
>> I tried it, but it seems to be buggy, it always completes right after the 
>> first element, I'm testing the TerminationMerge you referred to except it 
>> is generic, not http related. 
>>
>> https://gist.github.com/l15k4/633b31525f0728392fe0
>>
>> On Saturday, January 16, 2016 at 5:07:33 PM UTC+1, rkuhn wrote:
>>>
>>> too many communication channels :-)
>>>
>>> see here 
>>> <https://github.com/akka/akka/blob/master/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L125>
>>>  for 
>>> an example of such a merge
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 16 jan 2016 kl. 16:40 skrev Jakub Liska :
>>>
>>> Hey,
>>>
>>> I cannot figure out how should I upgrade this very simple 1.0 FlexiMerge 
>>> that just throws away elements of one of the input ports : 
>>>
>>> https://gist.github.com/l15k4/6d01261b5e579a02f4fd
>>>
>>> From what I see in GraphStage source code, one can only read from a 
>>> specific Input port, so that it seems like I should 
>>> implement the "ReadAny" behavior myself? 
>>>
>>> Thank you, Jakub
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>>
>>> *Dr. Roland Kuhn*
>>> *Akka Tech Lead*
>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>> twitter: @rolandkuhn
>>> <http://twitter.com/#!/rolandkuhn>
>>>
>>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> *Dr. Roland Kuhn*
>> *Akka Tech Lead*
>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>> twitter: @rolandkuhn
>> <http://twitter.com/#!/rolandkuhn>
>>
>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to replace FlexiMerge and ReadAny in 2.0 GraphStage ?

2016-01-16 Thread Jakub Liska
You are right,

this is the correct version 
: https://gist.github.com/l15k4/6d01261b5e579a02f4fd#gistcomment-1671753

Thanks a lot Ronald !


On Saturday, January 16, 2016 at 7:35:33 PM UTC+1, rkuhn wrote:
>
> Hi Jakub,
>
> it is not buggy, it is tailored to the HTTP use-case ;-) I meant the link 
> merely as an inspiration showing you how to use passAlong() and 
> eagerTerminateInput / ignoreTerminateInput; I think your use-case calls for 
> the latter.
>
> Regards,
>
> Roland
>
> 16 jan 2016 kl. 19:32 skrev Jakub Liska 
> >:
>
> Hey,
>
> I tried it, but it seems to be buggy, it always completes right after the 
> first element, I'm testing the TerminationMerge you referred to except it 
> is generic, not http related. 
>
> https://gist.github.com/l15k4/633b31525f0728392fe0
>
> On Saturday, January 16, 2016 at 5:07:33 PM UTC+1, rkuhn wrote:
>>
>> too many communication channels :-)
>>
>> see here 
>> <https://github.com/akka/akka/blob/master/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L125>
>>  for 
>> an example of such a merge
>>
>> Regards,
>>
>> Roland
>>
>> 16 jan 2016 kl. 16:40 skrev Jakub Liska :
>>
>> Hey,
>>
>> I cannot figure out how should I upgrade this very simple 1.0 FlexiMerge 
>> that just throws away elements of one of the input ports : 
>>
>> https://gist.github.com/l15k4/6d01261b5e579a02f4fd
>>
>> From what I see in GraphStage source code, one can only read from a 
>> specific Input port, so that it seems like I should 
>> implement the "ReadAny" behavior myself? 
>>
>> Thank you, Jakub
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> *Dr. Roland Kuhn*
>> *Akka Tech Lead*
>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>> twitter: @rolandkuhn
>> <http://twitter.com/#!/rolandkuhn>
>>
>>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
> twitter: @rolandkuhn
> <http://twitter.com/#!/rolandkuhn>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to replace FlexiMerge and ReadAny in 2.0 GraphStage ?

2016-01-16 Thread Jakub Liska
Hey,

I tried it, but it seems to be buggy, it always completes right after the 
first element, I'm testing the TerminationMerge you referred to except it 
is generic, not http related. 

https://gist.github.com/l15k4/633b31525f0728392fe0

On Saturday, January 16, 2016 at 5:07:33 PM UTC+1, rkuhn wrote:
>
> too many communication channels :-)
>
> see here 
> <https://github.com/akka/akka/blob/master/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L125>
>  for 
> an example of such a merge
>
> Regards,
>
> Roland
>
> 16 jan 2016 kl. 16:40 skrev Jakub Liska 
> >:
>
> Hey,
>
> I cannot figure out how should I upgrade this very simple 1.0 FlexiMerge 
> that just throws away elements of one of the input ports : 
>
> https://gist.github.com/l15k4/6d01261b5e579a02f4fd
>
> From what I see in GraphStage source code, one can only read from a 
> specific Input port, so that it seems like I should 
> implement the "ReadAny" behavior myself? 
>
> Thank you, Jakub
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
> twitter: @rolandkuhn
> <http://twitter.com/#!/rolandkuhn>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to replace FlexiMerge and ReadAny in 2.0 GraphStage ?

2016-01-16 Thread Jakub Liska
Hey,

I cannot figure out how should I upgrade this very simple 1.0 FlexiMerge 
that just throws away elements of one of the input ports : 

https://gist.github.com/l15k4/6d01261b5e579a02f4fd

>From what I see in GraphStage source code, one can only read from a 
specific Input port, so that it seems like I should 
implement the "ReadAny" behavior myself? 

Thank you, Jakub

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] way to get ActorContext from ActorSystem?

2015-10-29 Thread Jakub Liska
Hey Sam,

if you are extending TestKitBase trait then his is the way to go : 

TestActorRef[TestActor].underlyingActor.context



On Thursday, February 19, 2015 at 5:58:15 PM UTC+1, Sam Halliday wrote:
>
> On Thursday, 19 February 2015 16:54:47 UTC, Heiko Seeberger wrote:
>>
>> What about using an ActorRefFactory?
>>
>
> Yeah, I thought about that but I also need an ActorSystem (it's using 
> akka.io) and typically when you need both an ActorSystem and an 
> ActorRefFactory the implicit resolution gets a bit nasty on the caller side.
>
>  
>
>> --
>>
>> *Heiko Seeberger*
>> Home: heikoseeberger.de
>> Twitter: @hseeberger 
>> Public key: keybase.io/hseeberger
>>
>> On 19 Feb 2015, at 17:49, Sam Halliday  wrote:
>>
>> Hi all,
>>
>> I wrote a little convenience method that starts up a bunch of actors, and 
>> I intentionally take in an ActorContext because this is typically used with 
>> a supervisor strategy.
>>
>> However, when I'm testing it (and also writing examples for people to 
>> easily understand the API), it would be incredibly convenient if the 
>> ActorContext that spawns the actors was the Guardian.
>>
>> Is there any way to do this, or must I write a little dumb supervisor 
>> (such as below) to do this? I tried searching in TestKit but didn't come up 
>> with anything.
>>
>>class DumbSupervisorActor extends Actor {
>> override def preStart(): Unit = {
>>   startAllTheThings()
>> }
>> def receive = Actor.emptyBehavior
>>   }
>>
>> Best regards,
>> Sam
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka streams based Lambda architecture

2015-10-28 Thread Jakub Liska
Hey,

currently our lambda architecture is designed this way :

Tree based hierarchy of View Materializer Actors which is mostly done due 
to Actor supervision. Each Materializer Actor triggers an akka-stream that 
builds the resulting View. This design works but it leads to complex actors 
because each following Materializer must be initialized from its parent 
Materializer - that's how Actor hierarchy works. Overall it is not easy to 
design a complex lambda architecture tree/flow.

I was thinking about rewriting this to one big Akka Stream where original 
materializer Actor hierarchy would become an akka-stream Graph. 
Materializer Actors would turn into Flows and these Flows would spin an 
akka-stream building a View.

Think of it this way :

   start ~> FtpToS3  ~> S3ToElasticsearch ~> ElasticSearchToS3 ~> 
SparkJobFoo ~> finish
 ~> S3ToS3~> S3ToFs~>
 ~> SparkJobBar~>



Questions :

1) Is it OK to have an akka-stream Graph that has almost no throughput? In 
fact there would be just a single element coming through it. It would just 
collect statistics about performance and status. 

2) I'm not sure how to deal with the wrapping of self-contained 
akka-streams into akka-stream Materializer Flows... Would it be ok if was 
done simply this way : 

S3ToElasticsearchFlow().mapAsync(1) { x => SomeSource()..runWith(
SomeSink()) }

Or is there a better way? 

I've never really thought of akka-streams as just a tool for building a 
Graph... I've always thought of it as a 
high-throughput-asynchronous-boundary-crossing tool. But now I can see use 
cases that have nothing to do with it.

Thanks !!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Context of event stream elements

2015-07-21 Thread Jakub Liska

>
> I didn’t understand you point about “global” state, that would surely not 
> be useful for tracing anything because you’d get all data munged together, 
> or am I missing something?
>

Hi, you are right, it is impossible to have a connection between particular 
element and data in the context ... it must be worked around on the API 
user side rather than in the akka-stream implementation itself. Thanks !

On Sunday, June 21, 2015 at 9:23:13 AM UTC+2, rkuhn wrote:
>
> Hi Jakub,
>
> I see what you are saying, but that only makes sense for linear 1:1 
> transformations (and possibly filter()). If we were to add something like 
> this then it would need to work with all stream combinators including 
> custom stages etc., because otherwise it would just be a big pile of junk 
> ;-) So, how would you propose to manage the per-element context in expand() 
> or conflate()? BTW: I didn’t understand you point about “global” state, 
> that would surely not be useful for tracing anything because you’d get all 
> data munged together, or am I missing something?
>
> This is a hard problem. That we don’t have a solution yet is neither due 
> to laziness nor forgetfulness ;-)
>
> Regards,
>
> Roland
>
> 15 maj 2015 kl. 12:14 skrev Jakub Liska 
> >:
>
> I'm facing also this problem with passing a context through the stream. 
> Let say you have a stream like this : 
>
> download resource -> back it up -> parse it -> store errors -> process 
>> data -> ... -> persist data -> sink
>
>
> you kinda need some sort of context so that you know that if some latter 
> flows fails it was related to a particular resource from the first Flow and 
> you can log/handle it in Supervision.Decider. 
>
> I'm using HList for the In&Out Flow types which is basically the context 
> so that instead of gluing flow parts together with many case classes for 
> instance I use a growing HList. But it turned out to be quite a maintenance 
> disaster and it is getting out of control because it lost flexibility. I 
> think that each stream should optionally have something like a 
> Context/GlobalState accessible to all Flows, even if it was a god damn 
> Map[String, Any] because it is impossible to implement it manually for it 
> to be maintainable.
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
> twitter: @rolandkuhn
> <http://twitter.com/#!/rolandkuhn>
>  
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Troubleshooting push-pull signal/token disappearance in a complex stream

2015-07-17 Thread Jakub Liska
Hi, how are you guys tracking it? If your stream just starts hanging and 
you cannot reproduce it because it only occurs in a complex stream.

I'm able to see a few hints thanks to 
"ActorMaterializerSettings#withDebugLogging(enable = true)" and Log stage 
that tells me what stream stages are hanging (terminated abruptly) and 
where the elements stop flowing, but that doesn't really reveal the problem.

I'm trying to figure out this problem for almost 2 days :

https://github.com/akka/akka/issues/17913#issuecomment-121948347

I don't want to give up on the custom component because this issue will 
definitely happen again someplace else and my guts tell me that I should 
better learn how to deal with this situation...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly

2015-07-17 Thread Jakub Liska
Ah, sorry it requires logger being set to debug mode too. Thanks !

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly

2015-07-17 Thread Jakub Liska
I tried this already, but it doesn't seem to have any effect on logging...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly

2015-07-16 Thread Jakub Liska
Samuel how did you manage to enable this logging : 

[DEBUG] [04/30/2015 22:36:01.921] [default-akka.actor.default-dispatcher-8] 
[akka://default/system/deadLetterListener] stopped [DEBUG] [04/30/2015 
22:36:01.922] [default-akka.actor.default-dispatcher-5] 
[akka://default/user/$a/flow-2-3-publisherSource-processor-mapConcat] 
stopped [DEBUG] [04/30/2015 22:36:01.922] 
[default-akka.actor.default-dispatcher-6] 
[akka://default/user/$a/flow-2-9-publisherSource-PoolConductor.retryMerge-
flexiMerge-PoolConductor.retryMerge-flexiMerge] stopped [DEBUG] [04/30/2015 
22:36:01.922] [default-akka.actor.default-dispatcher-5] 
[akka://default/user/$a/flow-2-2-publisherSource-processor-
PoolSlot.SlotEventSplit-flexiRoute] stopped [DEBUG] [04/30/2015 
22:36:01.923] [default-akka.actor.default-dispatcher-12] 
[akka://default/user/$a/flow-2-4-publisherSource-Merge] stopped [DEBUG] 
[04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-14] 
[akka://default/user/$a/flow-2-1-publisherSource-Merge] stopped [DEBUG] 
[04/30/2015 22:36:01.923] [default-akka.actor.default-dispatcher-10] 
[akka://default/user/$a/flow-2-11-publisherSource-PoolConductor.retryMerge-
flexiMerge-PoolConductor.RetrySplit-flexiRoute] stopped [DEBUG] [04/30/2015 
22:36:01.923] [default-akka.actor.default-dispatcher-12] 
[akka://default/user/$a/flow-2-12-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
 
stopped 

I have akka loglevel set to DEBUG and logback too, but these messages are 
not logged, thanks

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] flow bypassing techniques

2015-07-05 Thread Jakub Liska
It required a little bit of Round Robing : -) 

class RoundRobinBypassingMerge[A, B] extends FlexiMerge[A, FanInShape2[A, B, 
A]](new FanInShape2("RRBMerge"), OperationAttributes.name("RRBMerge")) {
  def createMergeLogic(p: PortT): MergeLogic[A] = new MergeLogic[A] {


val read1: State[A] = State[A](Read(p.in0)) { (ctx, input, element) =>
  ctx.emit(element)
  read2
}
val read2: State[B] = State[B](Read(p.in1)) { (ctx, input, element) =>
  read1
}
val readRemaining1: State[A] = State[A](Read(p.in0)) { (ctx, input, 
element) =>
  ctx.emit(element)
  SameState
}
val readRemaining2: State[B] = State[B](Read(p.in1)) { (ctx, input, 
element) =>
  SameState
}


override def initialState: State[_] = read1


override def initialCompletionHandling = CompletionHandling(
  onUpstreamFinish = { (ctx, input) ⇒
ctx.changeCompletionHandling(defaultCompletionHandling)
if (input eq p.in0) readRemaining2 else readRemaining1
  },
  onUpstreamFailure = { (ctx, _, cause) ⇒
ctx.fail(cause)
SameState
  })


  }
}



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] flow bypassing techniques

2015-07-05 Thread Jakub Liska
It still has a bug I cannot find, on complicated streams it pushes elements 
A :

ctx.emit(element)

but they don't arrive into the Sink that follows and the stream blocks 
indefinitely ... Any idea ?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] flow bypassing techniques

2015-07-05 Thread Jakub Liska
FlexiMerge seems to be a valid solution to this problem : 

class BypassMerge[A, B] extends FlexiMerge[A, FanInShape2[A, B, A]](new 
FanInShape2("BypassMerge"), OperationAttributes.name("BypassMerge")) {
  def createMergeLogic(p: PortT): MergeLogic[A] = new MergeLogic[A] {

val readA: State[A] = State[A](Read(p.in0)) { (ctx, input, element) ⇒
  ctx.emit(element)
  readA
}

val readB: State[B] = State[B](Read(p.in1)) { (ctx, input, element) ⇒
  readB
}

override def initialCompletionHandling = CompletionHandling (
  onUpstreamFinish = { (ctx, input) ⇒
ctx.changeCompletionHandling(defaultCompletionHandling)
readA
  },
  onUpstreamFailure = { (ctx, _, cause) ⇒
ctx.fail(cause)
SameState
  }
)
override def initialState: State[_] = readB
  }
}

  "Bypass string source with more element" in {
val p = FlowGraph.closed(Sink.publisher[Int]) { implicit b =>
  import FlowGraph.Implicits._
  sink =>
val merge = b.add(new BypassMerge[Int, String])
Source(List(1,2)) ~> merge.in0
Source(List("1","2","3","4")) ~> merge.in1
merge.out ~> sink.inlet
}.run()


val s = TestSubscriber.manualProbe[Int]
p.subscribe(s)
val sub = s.expectSubscription()


sub.request(10)
s.expectNext(1)
s.expectNext(2)
s.expectComplete()
  }




  "Bypass string source with less elements" in {
val p = FlowGraph.closed(Sink.publisher[Int]) { implicit b =>
  import FlowGraph.Implicits._
  sink =>
val merge = b.add(new BypassMerge[Int, String])
Source(List(1,2,3,4)) ~> merge.in0
Source(List("1","2")) ~> merge.in1
merge.out ~> sink.inlet
}.run()


val s = TestSubscriber.manualProbe[Int]
p.subscribe(s)
val sub = s.expectSubscription()


sub.request(10)
s.expectNext(1)
s.expectNext(2)
s.expectNext(3)
s.expectNext(4)
s.expectComplete()
  }



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] flow bypassing techniques

2015-07-05 Thread Jakub Liska
Hi,

sometimes I need to "ignore" the output of a flow and use it's input 
instead down the stream... There are 2 possible ways to do that I'm aware 
of : 

  def bypass[I,O](flow: Flow[I,O]): Flow[I,I,Any] = Flow() { implicit b =>
import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[E](2))
broadcast.out(0) ~> flow ~> Sink.ignore
(broadcast.in, broadcast.out(1))
  }

This one has a disadvantage that possible errors/exceptions are not 
propagated and swallowed by Sink.ignore. Second way is zipping : 

def bypass(flow: Flow[I,O]): Flow[I,I,Any] = Flow() { implicit b =>
  import FlowGraph.Implicits._
  val bcast = b.add(Broadcast[I](2))
  val zip = b.add(Zip[I, O]())


  bcast.out(0) ~> zip.in0
  bcast.out(1) ~> flow ~> zip.in1


  (bcast.in, zip.out.map(_._1).outlet)
}


But it has a disadvantage that the flow being bypassed cannot mess with the 
stream (filter, mapConcat, etc.) otherwise it gets corrupted and it will 
cause unpredictable behavior of the stream.

Does anybody know a better way of doing that ?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska
Done, thank you  https://github.com/akka/akka/issues/17614

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska
I got it, it wasn't related to akka-stream but to one of the services on a 
particular host the stream is communicating with, it's throughtput 
decreased from day to day about 95% and the entire stream depended on it so 
it decreased about 95% too... 

It's pretty hard to detect such state, it'd really need some built-in 
profiling 
mechanism https://groups.google.com/forum/#!topic/akka-user/emBAWO2rxsU

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska

>
> The total number of request is tracked in totalNumber.


You mean "total number of request*s*"? Like "total number of requested 
elements that may come in x requests" ?

Anyway I changed it to : 

case Request(n) if totalDemand > 0 && isActive =>
  (1L to totalDemand).foldLeft(true) {
case (acc,i) => if (acc) pushNext else false
  }


And it is still idling ...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska
The documentation says : 

totalNumber : 

Total number of requested elements from the stream subscriber.
> This actor automatically keeps tracks of this amount based on
> incoming request messages and outgoing `onNext`.


n : 

n number of requested elements

 
As I see it,  "n" represents "totalNumber"  in this context, but 
totalNumber is also a stateful representation for internal purposes...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska

>
>   def receive: Receive = {
> case Request(n) if totalDemand > 0 && isActive =>
>   (1L to Math.min(n, totalDemand)).foldLeft(true) {
>
>  

> What is the above line intending to do? Why are you taking a minimum of n 
> and totalDemand? Why are you not using totalDemand directly?

 

Well I couldn't really say how many elements is actually demanded, whether 
"n" or "totalDemand" ... "Request(n)" comes saying push me "n" elements, 
right? That is the actual demand.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream is idling with merged ActorPublishers

2015-05-29 Thread Jakub Liska
Hey,

I have a Source that merges ActorPublishers, this is a simplification : 

Source[Foo]() { implicit b =>
val actorSources = myActorPublisherArray
b.add(Merge[Foo](actorSources.length))
for (i <- 0 until actorSources.length) {
  b.addEdge(b.add(actorSources(i)), merge.in(i))
}
merge.out
  }
}



class MyActorPublisher extends ActorPublisher[Foo] {
  var batchIterator = someBlockingIterator

  
  def pushNext: Boolean = {
if (batchIterator.hasNext) {

  val nextMnfResource = batchIterator.next()
  onNext(nextMnfResource)
  true
} else {
  onCompleteThenStop()
  false
}
  }
  def receive: Receive = {
case Request(n) if totalDemand > 0 && isActive =>
  (1L to Math.min(n, totalDemand)).foldLeft(true) {
case (acc,i) => if (acc) pushNext else false
  }
case Cancel =>
  context.stop(self)
  }
}



But the stream is idling most of the time with like 1% CPU usage like there 
was no `pull demand` for some reason... I'm trying to reproduce it and find 
the catch for 2 days already. 

Any idea why that could be? I think it started with RC1 or RC2 when I set 
Flow#mapAsync(parallelism) as required in following Flows, I removed all 
custom "Stages" and I don't really do anything except Map and MapAsync in 
flows...

Also it doesn't seem to be happening with 

  .withInputBuffer (
initialSize = 1,
maxSize = 1
  )


But then the throughtput is insufficient... 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: ANNOUNCE: Akka Streams & HTTP 1.0-RC3

2015-05-28 Thread Jakub Liska
Hi, 

how about these 2 deps? Haven't they been deployed or their name changed? 

[warn]  ::
[warn]  ::  UNRESOLVED DEPENDENCIES ::
[warn]  ::
[warn]  :: com.typesafe.akka#akka-http-scala-experimental_2.11;1.0-RC3: not 
found
[warn]  :: 
com.typesafe.akka#akka-http-testkit-scala-experimental_2.11;1.0-RC3: not 
found
[warn]  ::
[

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] How to design stream profiling properly?

2015-05-28 Thread Jakub Liska
I have this weird problem with probably broken pull demand in a stream of 
this representation : 

Source[Foo]() { implicit b =>
val actorSources = Array(100%CorrectlyImplementedActorPublishers)
b.add(Merge[Foo](actorSources.length))
for (i <- 0 until actorSources.length) {
  b.addEdge(b.add(actorSources(i)), merge.in(i))
}
merge.out
  }
}


+ following 6 Flows doing map/mapAsync with various parallelism and 
Sink.fold.

And from profiling I just did it seems that most of the processing time the 
stream is stuck with 1% CPU usage, because it isn't pulling next elements, 
but I cannot reproduce it...
   

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] How to design stream profiling properly?

2015-05-28 Thread Jakub Liska
Hi,

having a stream with many Flow components often makes you wondering where 
all the time is being spent.

Any idea how to end up with something like a HashMap with entries 
[flowName/flowType, msSum] ?

I can think of either a profiling function being explicitly passed to all 
Flow combinators that do some work like map/mapAsync

Or implementing a PushPullStage that would be appended via "transform()" 
after each Flow - but I'm not sure at all if summing times between 
individual pushes would be a sensible and correct result - imho it 
wouldn't...

Is there a better way to do this ?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] How to construct an empty/dummy partial graph?

2015-05-28 Thread Jakub Liska
I see,

I didn't know that partial graph that exposes in&out ports can be used like 
a common Flow : 

in ~> partialGraph ~> out

There is only this in documentation : 

partialGraph.runWith(Source(List(1)), Sink.head)

So I thought it was not possible... Thank you Endre !!!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] How to construct an empty/dummy partial graph?

2015-05-28 Thread Jakub Liska
Hi,

I cannot figure out, how would I do something like : 

bucketOpt match {
  case None =>
*// ??? How to return just a dummy partial graph ???*
  case Some(bucket) =>
Flow() { implicit b =>
  import FlowGraph.Implicits._

  val broadcast = b.add(Broadcast[Array[ResCtx]](2))

  val uniqueResourceFilter = Flow[Array[ResCtx]].transform(() => new 
UniqueResourceFilter)
  val backup = Flow[Iterable[Resource]].mapAsyncUnordered(4) { resources =>
Future.sequence(
  resources.map( res => S3BackupFlow.backup(bucket, res, res.name, 
deleteLocal))
)
  }
  broadcast.out(0) ~> uniqueResourceFilter ~> backup ~> Sink.ignore

  (broadcast.in, broadcast.out(1))
}
}


Also it is a mystery to me how to grab just an inlet stream, for doing 
something like : 

Flow() { implicit b =>
  import FlowGraph.Implicits._

  

  val uniqueResourceFilter = Flow[Array[ResCtx]].transform(() => new 
UniqueResourceFilter)

  val merge = b.add(Merge[Array[ResCtx]](2))

 

  ???in??? ~> filter ~> broadcast.out(0) ~> uniqueResourceFilter ~> backup ~> 
merge.in(0)

 ~> broadcast.out(1) ~> uniqueResourceFilter ~> backup ~> 
merge.in(1)


(???in???, merge.out)
}



Thanks, Jakub

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka streams] question on some time related use cases

2015-05-27 Thread Jakub Liska
Hi,

btw can Stage by stateful? Is R/W from/to this in a PushPullStage thread 
safe? 

var state : Map[A,Cancellable] = Map.empty

Thanks, Jakub


On Friday, January 23, 2015 at 2:42:11 AM UTC+1, Frank Sauer wrote:
>
> Thanks for the pointers Endre,  I’ll explore those ideas.
>
> Frank
>
> On Jan 22, 2015, at 4:02 AM, Endre Varga  > wrote:
>
>
>
> On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer  > wrote:
>
>> Update, in a simple test scenario like so 
>>
>>   val ticks = Source(1 second, 1 second, () => "Hello")
>>
>>   val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => 
>> true)).to(Sink.foreach(println(_)))
>>
>>   flow.run()
>>
>> I'm seeing the following error, so this doesn't work at all and I'm not 
>> sure it is because of threading:
>>
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
>> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
>> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
>> at 
>> experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>> at 
>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I think I'm violating the one very important rule mentioned in the docs - 
>> when the timer fires it calls a push on the context but there is also a 
>> pull going on concurrently(?) - and this is indeed breaking in spectacular 
>> ways as expected 
>>
>
> :)
>  
>
>>
>> I have no idea how to implement this correctly. It looked pretty simple 
>> at first, but alas... 
>>
>
> See my previous mail. The main problem here is mixing backpressured 
> streams (your data) and non-backpressured events (timer triggers) in a safe 
> fashion. Well, the main problem is not how to implement it, but how to 
> expose an API to users which is as safe as possible. We have groupedWithin, 
> takeWithin and dropWithin as timer based ops, but no customization for now.
>
> -Endre
>  
>
>>
>> On Wednesday, January 21, 2015 at 8:51:21 PM UTC-5, Frank Sauer wrote:
>>>
>>> Thanks, I came up with the following, but I have some questions:
>>>
>>> /**
>>>* Holds elements of type A for a given finite duration after a 
>>> predicate p first yields true and as long as subsequent
>>>* elements matching that first element (e.g. are equal) still satisfy 
>>> the predicate. If a matching element arrives during
>>>* the given FiniteDuration for which the predicate p does not hold, 
>>> the original element will NOT be pushed downstream.
>>>* Only when the timer expires and no matching elements have been seen 
>>> for which p does not hold, will elem be pushed
>>>* downstream.
>>>*
>>>* @param duration The polling interval during which p has to hold true
>>>* @param pThe predicate that has to remain true during the 
>>> duration
>>>* @param system   implicit required to schedule timers
>>>* @tparam A   type of the elements
>>>*/
>>>   class FilterFor[A](duration : FiniteDuration)(p: A => 
>>> Boolean)(implicit system: ActorSystem) extends PushStage[A,A] {
>>>
>>> var state : Map[A,Cancellable] = Map.empty
>>>
>>> override def onPush(elem: A, ctx: Context[A]): Directive = 
>>> state.get(elem) match {
>>>
>>>   case Some(timer) if !p(elem) => // pending timer but condition no 
>>> longer holds => cancel timer
>>>  timer.cancel()
>>>  state = state - elem
>>>  ctx.pull()
>>>
>>>case None if p(elem) => // no pending timer and predicate true -> 
>>> start and cache new timer
>>>  val timer = system.scheduler.scheduleOnce(duration) {
>>>// when timer fires, remove from state and push elem 
>>> downstream
>>>state = state - elem
>>>ctx.push(elem); // is this safe?
>>>  }
>>>  state = state + (elem -> timer)
>>>  ctx.pull()
>>>
>>>case _ => ctx.pull() // otherwise simply wait for the next 
>>> upstream element
>>> }
>>>
>>>   }
>>>
>>> My main concerns are these:
>>>
>>> 1) Is it safe to invoke ctx.push from the thread on which the timer 
>>> fires?
>>> 2) How do I react to upstream or downstream finish or cancel events - do 
>>> I have to?
>>> 3) Can I integrate this into the DSL without using transform, e.g. can I 
>>> somehow add a filterFor method on something via a pimp my library?
>>>
>>> Any and all pointers would be very much appreciated,
>>>
>>> Thank

Re: [akka-user] [Streams] Is ActorPublisher requested asynchronously or sequentially?

2015-05-25 Thread Jakub Liska
Yeah, but I'm willing to prefer correctness over performance in this case 
because a month ago I was pretty sure that it was almost impossible to 
implement a non-blocking/asynchronous ActorPublisher. I don't remember the 
reasons now but there were some real issues preventing that... There are a 
few threads in Akka-users group discussing it...

On Monday, May 25, 2015 at 1:12:03 PM UTC+2, √ wrote:
>
> Blocking is never ideal :)
>
> A non-blocking alternative might be to have it use pipeTo and send the 
> actor a message and then react to that message.
>
> On Mon, May 25, 2015 at 1:08 PM, Jakub Liska  > wrote:
>
>> Good thinking :-) Blocking the "scroll" async method right away seems to 
>> be ideal. Thank you
>>
>> On Monday, May 25, 2015 at 11:55:50 AM UTC+2, √ wrote:
>>
>>>
>>>
>>> On Mon, May 25, 2015 at 11:45 AM, Jakub Liska  
>>> wrote:
>>>
>>>> 1) But if you share ExecutionContext with the actor (both using 
>>>> dispatcher's thread) then there cannot be a concurrent execution - it's 
>>>> just a question of whether "Future#onComplete" callback executes before 
>>>> receive partial function returns
>>>>
>>>>
>>> The dispatcher has N threads, and it can (and will) execute the actor 
>>> concurrently with other things, which in this case is the onComplete 
>>> callback.
>>>  
>>>
>>>> 2) I guess you are right, that even when using dispatcher's thread the 
>>>> "Future#onComplete" is probably executed after actor's 'receive' function 
>>>> returns, so even though it is done so by the same thread, it can happen 
>>>> after another Request is processed in the mean time because it was queued 
>>>> prior to "Future#onComplete" callback...
>>>>
>>>> I think that the solution is to substitute "Future#onComplete" with 
>>>> "Future#map" and handling only Future error in "Future#recover" ... I'll 
>>>> try that out... Thank you
>>>>
>>>
>>> Substituting those won't solve the issue of closing over, and calling, 
>>> methods on the actor from another thread.
>>> Since what you are doing is blocking anyway, why not block first, and 
>>> then execute the logic in the actor itself?
>>>  
>>>
>>>>
>>>>
>>>> On Monday, May 25, 2015 at 11:26:09 AM UTC+2, √ wrote:
>>>>>
>>>>> Hi Jakub,
>>>>>
>>>>> Starting to read your email I definitely thought there must be 
>>>>> something mysterious at work!
>>>>>
>>>>> From what I can tell, there are a couple of compounding things here:
>>>>>
>>>>> 1) future.onComplete will be executed on another thread than the 
>>>>> actor, or "concurrently with the actor", this means that you can't close 
>>>>> over the actor and call methods on it from another thread, see: 
>>>>> http://doc.akka.io/docs/akka/2.3.11/additional/faq.html
>>>>>
>>>>> 2) when you call `Await` on the Future, you're only going to await it 
>>>>> having a value, not await its callbacks to finish execute.
>>>>> So:
>>>>>
>>>>> 1. val f = someFuture(…)
>>>>> 2. f.onComplete { … }
>>>>> 3. Await.result(f, …)
>>>>>
>>>>> When line 3 executes, onComplete could have already executed, is 
>>>>> (con)currently being executed or will be executed.
>>>>>
>>>>> Does that make sense?
>>>>>
>>>>>
>>>>> On Mon, May 25, 2015 at 11:03 AM, Jakub Liska  
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> in other words :
>>>>>>
>>>>>> def receive: Receive = {
>>>>>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>>>>>
>>>>>> // can it happen that another Request message comes before this 
>>>>>> partial function returns (while this one is being processed) ?
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> I have an asynchronous ActorProvider that is scanning ElasticSearch 
>>>>>> index, but I'm calling "await" at

Re: [akka-user] [Streams] Is ActorPublisher requested asynchronously or sequentially?

2015-05-25 Thread Jakub Liska
Good thinking :-) Blocking the "scroll" async method right away seems to be 
ideal. Thank you

On Monday, May 25, 2015 at 11:55:50 AM UTC+2, √ wrote:
>
>
>
> On Mon, May 25, 2015 at 11:45 AM, Jakub Liska  > wrote:
>
>> 1) But if you share ExecutionContext with the actor (both using 
>> dispatcher's thread) then there cannot be a concurrent execution - it's 
>> just a question of whether "Future#onComplete" callback executes before 
>> receive partial function returns
>>
>>
> The dispatcher has N threads, and it can (and will) execute the actor 
> concurrently with other things, which in this case is the onComplete 
> callback.
>  
>
>> 2) I guess you are right, that even when using dispatcher's thread the 
>> "Future#onComplete" is probably executed after actor's 'receive' function 
>> returns, so even though it is done so by the same thread, it can happen 
>> after another Request is processed in the mean time because it was queued 
>> prior to "Future#onComplete" callback...
>>
>> I think that the solution is to substitute "Future#onComplete" with 
>> "Future#map" and handling only Future error in "Future#recover" ... I'll 
>> try that out... Thank you
>>
>
> Substituting those won't solve the issue of closing over, and calling, 
> methods on the actor from another thread.
> Since what you are doing is blocking anyway, why not block first, and then 
> execute the logic in the actor itself?
>  
>
>>
>>
>> On Monday, May 25, 2015 at 11:26:09 AM UTC+2, √ wrote:
>>>
>>> Hi Jakub,
>>>
>>> Starting to read your email I definitely thought there must be something 
>>> mysterious at work!
>>>
>>> From what I can tell, there are a couple of compounding things here:
>>>
>>> 1) future.onComplete will be executed on another thread than the actor, 
>>> or "concurrently with the actor", this means that you can't close over the 
>>> actor and call methods on it from another thread, see: 
>>> http://doc.akka.io/docs/akka/2.3.11/additional/faq.html
>>>
>>> 2) when you call `Await` on the Future, you're only going to await it 
>>> having a value, not await its callbacks to finish execute.
>>> So:
>>>
>>> 1. val f = someFuture(…)
>>> 2. f.onComplete { … }
>>> 3. Await.result(f, …)
>>>
>>> When line 3 executes, onComplete could have already executed, is 
>>> (con)currently being executed or will be executed.
>>>
>>> Does that make sense?
>>>
>>>
>>> On Mon, May 25, 2015 at 11:03 AM, Jakub Liska  
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> in other words :
>>>>
>>>> def receive: Receive = {
>>>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>>>
>>>> // can it happen that another Request message comes before this 
>>>> partial function returns (while this one is being processed) ?
>>>>
>>>> }
>>>>
>>>>
>>>> I have an asynchronous ActorProvider that is scanning ElasticSearch 
>>>> index, but I'm calling "await" at the end, so it is basically blocking :
>>>>
>>>> private var lastScrollId: String = _
>>>>
>>>>
>>>> def receive: Receive = {
>>>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>>> def pushRecursively(n: Long, scrollId: String): Future[Option[String]] 
>>>> = {
>>>>   require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
>>>> present!")
>>>>   scroll(scrollId) flatMap {
>>>> case (sid, recs) if recs.isEmpty => // empty hits means end of 
>>>> scanning/scrolling
>>>>   Future.successful(Option.empty)
>>>> case (sid, recs) =>
>>>>   onNext(recs)
>>>>   if (n > 1)
>>>> pushRecursively(n-1, sid)
>>>>   else
>>>> Future.successful(Option(sid))
>>>>   }
>>>> }
>>>>
>>>> val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
>>>> f onComplete {
>>>>   case Failure(ex) =>
>>>> log.error(ex, &q

Re: [akka-user] [Streams] Is ActorPublisher requested asynchronously or sequentially?

2015-05-25 Thread Jakub Liska
1) But if you share ExecutionContext with the actor (both using 
dispatcher's thread) then there cannot be a concurrent execution - it's 
just a question of whether "Future#onComplete" callback executes before 
receive partial function returns

2) I guess you are right, that even when using dispatcher's thread the 
"Future#onComplete" is probably executed after actor's 'receive' function 
returns, so even though it is done so by the same thread, it can happen 
after another Request is processed in the mean time because it was queued 
prior to "Future#onComplete" callback...

I think that the solution is to substitute "Future#onComplete" with 
"Future#map" and handling only Future error in "Future#recover" ... I'll 
try that out... Thank you

On Monday, May 25, 2015 at 11:26:09 AM UTC+2, √ wrote:
>
> Hi Jakub,
>
> Starting to read your email I definitely thought there must be something 
> mysterious at work!
>
> From what I can tell, there are a couple of compounding things here:
>
> 1) future.onComplete will be executed on another thread than the actor, or 
> "concurrently with the actor", this means that you can't close over the 
> actor and call methods on it from another thread, see: 
> http://doc.akka.io/docs/akka/2.3.11/additional/faq.html
>
> 2) when you call `Await` on the Future, you're only going to await it 
> having a value, not await its callbacks to finish execute.
> So:
>
> 1. val f = someFuture(…)
> 2. f.onComplete { … }
> 3. Await.result(f, …)
>
> When line 3 executes, onComplete could have already executed, is 
> (con)currently being executed or will be executed.
>
> Does that make sense?
>
>
> On Mon, May 25, 2015 at 11:03 AM, Jakub Liska  > wrote:
>
>> Hi,
>>
>> in other words :
>>
>> def receive: Receive = {
>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>>
>> // can it happen that another Request message comes before this partial 
>> function returns (while this one is being processed) ?
>>
>> }
>>
>>
>> I have an asynchronous ActorProvider that is scanning ElasticSearch 
>> index, but I'm calling "await" at the end, so it is basically blocking :
>>
>> private var lastScrollId: String = _
>>
>>
>> def receive: Receive = {
>>   case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
>> def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = 
>> {
>>   require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
>> present!")
>>   scroll(scrollId) flatMap {
>> case (sid, recs) if recs.isEmpty => // empty hits means end of 
>> scanning/scrolling
>>   Future.successful(Option.empty)
>> case (sid, recs) =>
>>   onNext(recs)
>>   if (n > 1)
>> pushRecursively(n-1, sid)
>>   else
>> Future.successful(Option(sid))
>>   }
>> }
>>
>> val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
>> f onComplete {
>>   case Failure(ex) =>
>> log.error(ex, "Unexpected ScanSource error")
>> onError(ex)
>> context.stop(self)
>>   case Success(sidOpt) => sidOpt match {
>> case None =>
>>   log.info("ScanSource just completed...")
>>   if (isCompleted)
>> log.warning("ScanSource already completed, I cannot figure out 
>> why this occurs!")
>>   else {
>> onComplete()
>> context.stop(self)
>>   }
>> case Some(sid) =>
>>   lastScrollId = sid
>>   }
>> }
>> f.await(600.seconds)
>>
>>   case Cancel =>
>> context.stop(self)
>> }
>>
>>
>> But as you can see, there is "log.warning" sayig that onComplete() was 
>> already called, which can happen only if ActorPublisher wasn't Requested 
>> sequentially.
>>
>> I think this implementation is correct and valid even though it is 
>> blocking actor's dispatcher thread. But I really cannot figure out how it 
>> can be "completed" twice...
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/fa

[akka-user] [Streams] Is ActorPublisher requested asynchronously or sequentially?

2015-05-25 Thread Jakub Liska
Hi,

in other words :

def receive: Receive = {
  case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>

// can it happen that another Request message comes before this partial 
function returns (while this one is being processed) ?

}


I have an asynchronous ActorProvider that is scanning ElasticSearch index, 
but I'm calling "await" at the end, so it is basically blocking :

private var lastScrollId: String = _


def receive: Receive = {
  case Request(demand) if totalDemand > 0 && demand > 0 && isActive =>
def pushRecursively(n: Long, scrollId: String): Future[Option[String]] = {
  require(scrollId != null && scrollId.nonEmpty, "Scroll id must be 
present!")
  scroll(scrollId) flatMap {
case (sid, recs) if recs.isEmpty => // empty hits means end of 
scanning/scrolling
  Future.successful(Option.empty)
case (sid, recs) =>
  onNext(recs)
  if (n > 1)
pushRecursively(n-1, sid)
  else
Future.successful(Option(sid))
  }
}

val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId)
f onComplete {
  case Failure(ex) =>
log.error(ex, "Unexpected ScanSource error")
onError(ex)
context.stop(self)
  case Success(sidOpt) => sidOpt match {
case None =>
  log.info("ScanSource just completed...")
  if (isCompleted)
log.warning("ScanSource already completed, I cannot figure out why 
this occurs!")
  else {
onComplete()
context.stop(self)
  }
case Some(sid) =>
  lastScrollId = sid
  }
}
f.await(600.seconds)

  case Cancel =>
context.stop(self)
}


But as you can see, there is "log.warning" sayig that onComplete() was 
already called, which can happen only if ActorPublisher wasn't Requested 
sequentially.

I think this implementation is correct and valid even though it is blocking 
actor's dispatcher thread. But I really cannot figure out how it can be 
"completed" twice...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to use Stashing in TypedActor?

2015-05-17 Thread Jakub Liska
Thank you Roland, I'll stick with untyped Actors because stashing is kind 
of essential for the way I'm dealing with backpressure and resilience right 
now. Future akka stream distributed support might be a good fit though.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to use Stashing in TypedActor?

2015-05-17 Thread Jakub Liska
Hi, I can't find any documentation or source code reference on using 
Stashing with TypedActor. Would please anybody help me out 
here? UnrestrictedStash is Actor type.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Context of event stream elements

2015-05-15 Thread Jakub Liska
I'm facing also this problem with passing a context through the stream. Let 
say you have a stream like this : 

download resource -> back it up -> parse it -> store errors -> process data 
> -> ... -> persist data -> sink


you kinda need some sort of context so that you know that if some latter 
flows fails it was related to a particular resource from the first Flow and 
you can log/handle it in Supervision.Decider. 

I'm using HList for the In&Out Flow types which is basically the context so 
that instead of gluing flow parts together with many case classes for 
instance I use a growing HList. But it turned out to be quite a maintenance 
disaster and it is getting out of control because it lost flexibility. I 
think that each stream should optionally have something like a 
Context/GlobalState accessible to all Flows, even if it was a god damn 
Map[String, Any] because it is impossible to implement it manually for it 
to be maintainable.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka Streams & HTTP 1.0-RC1 Announcement

2015-04-28 Thread Jakub Liska
Roland you are right with the arguments applying to foldLeft only, but this 
is just a syntactic matter. MapAsync is one of the most frequently used 
stream combinators and the streams would loose readability significantly 
like this ... 

On Tuesday, April 28, 2015 at 10:38:17 AM UTC+2, rkuhn wrote:
>
> More reasoning (but please correct me): for foldLeft the curried argument 
> list is needed in order to fix the function’s input type which would 
> otherwise need to be specified explicitly, plus it allows infix operator 
> notation (which is desirable for consistency with the :\ operator). Neither 
> of these arguments apply to mapAsync AFAICS.
>
> Regards,
>
> Roland 
>
> Sent from my iPhone
>
> On 28 Apr 2015, at 10:11, Roland Kuhn > 
> wrote:
>
> I'm on the phone right now and might be missing something but shouldn't 
> the following work?
>
> .mapAsync(4, { case …})
>
> Regards,
>
> Roland 
>
> Sent from my iPhone
>
> On 28 Apr 2015, at 07:37, Jakub Liska > 
> wrote:
>
> I'm deconstructing the argument on like 20 places in my application :
>
> flow.mapAsync { case res :: errors :: result :: HNil =>  ... }
>
> and now pattern matching will have to be used everywhere for the argument 
> to be deconstruced ... this interface is really unfortunate
>
>
> On Tuesday, April 28, 2015 at 7:24:34 AM UTC+2, Jakub Liska wrote:
>>
>> Hey,
>>
>> shouldn't the :
>>
>> Flow#mapAsync(parallelism: Int, f: Out ⇒ Future[T]): Repr[T, Mat]
>>
>> method have this signature :
>>
>> Flow#mapAsync(parallelism: Int)(f: Out ⇒ Future[T]): Repr[T, Mat]
>>
>> as scala collection foldLeft, so it could be called like : 
>>
>> Flow[Resource].mapAsync(4) { res => asyncCode }
>>
>> It is always a drag to call methods with function argument if multiple 
>> arguments are present...
>>
>  -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>  -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka Streams & HTTP 1.0-RC1 Announcement

2015-04-27 Thread Jakub Liska
I'm deconstructing the argument on like 20 places in my application :

flow.mapAsync { case res :: errors :: result :: HNil =>  ... }

and now pattern matching will have to be used everywhere for the argument 
to be deconstruced ... this interface is really unfortunate


On Tuesday, April 28, 2015 at 7:24:34 AM UTC+2, Jakub Liska wrote:
>
> Hey,
>
> shouldn't the :
>
> Flow#mapAsync(parallelism: Int, f: Out ⇒ Future[T]): Repr[T, Mat]
>
> method have this signature :
>
> Flow#mapAsync(parallelism: Int)(f: Out ⇒ Future[T]): Repr[T, Mat]
>
> as scala collection foldLeft, so it could be called like : 
>
> Flow[Resource].mapAsync(4) { res => asyncCode }
>
> It is always a drag to call methods with function argument if multiple 
> arguments are present...
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka Streams & HTTP 1.0-RC1 Announcement

2015-04-27 Thread Jakub Liska
Hey,

shouldn't the :

Flow#mapAsync(parallelism: Int, f: Out ⇒ Future[T]): Repr[T, Mat]

method have this signature :

Flow#mapAsync(parallelism: Int)(f: Out ⇒ Future[T]): Repr[T, Mat]

as scala collection foldLeft, so it could be called like : 

Flow[Resource].mapAsync(4) { res => asyncCode }

It is always a drag to call methods with function argument if multiple 
arguments are present...

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka-streams - How to define a Source from an arbitrary event stream?

2015-04-09 Thread Jakub Liska
I do it just via ActorPublisher, the "scroll" method is basically 
asynchronously loading elasticsearch records (classic cursor thingy). It's 
a combination of request demand and asynchronous source of events :  

  def receive: Receive = {
case Request(n) if totalDemand > 0 && n > 0 && isActive =>
  def pushRecursively(n: Long = Math.min(n, totalDemand), scrollId: String 
= lastScrollId): Future[Unit] = {

scroll(scrollId) flatMap {
  case (sid, recs) if recs.isEmpty => // empty hits means end of 
scanning/scrolling
onComplete()
context.stop(self)
Future.successful(())
  case (sid, recs) =>
lastScrollId = sid
val contexts = recs.map {
  case (recId, rec) => EsResource :: Map.empty[String, String] :: 
recId :: rec :: HNil
}
onNext(contexts)
if (n > 1)
  pushRecursively(n-1, sid)
else
  Future.successful(())
}
  }

  pushRecursively() onComplete {
case Failure(ex) =>
  onError(ex)
  context.stop(self)
case s =>
  }

case Cancel =>
  context.stop(self)
  }
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Streams] Is supervision strategy supposed to work with ActorPublishers?

2015-03-06 Thread Jakub Liska
Hi Björn,

so that the error handling applies only for the Stages operations like Map, 
MapAsync, Collect etc., right?

It kinda breaks designing streams in that you want to extract error 
handling out of the whole stream, not just from these operations ... 

This way you basically have to implement error handling in 
ActorSubscriber/ActorPublisher again because subscriber might throw 
Resumable exception while producing elements and subscriber too while 
consuming elements...

It'd be useful for Stream error reporting, collecting errors for instance...

On Wednesday, March 4, 2015 at 1:07:48 PM UTC+1, Björn Antonsson wrote:
>
> Hi,
>
> That is by design. Calling onError inside the ActorPublisher will 
> Terminate the stream with the failure (it's in the doc for the method). If 
> you on the other hand throw an exception during your processing I assume 
> that the SupervisionDecider will decide what to do.
>
> B/
>
> On 3 March 2015 at 23:22:21, Jakub Liska (liska...@gmail.com ) 
> wrote:
>
> Hey, 
>
> when ActorPublisher does : 
> "onError(exceptionRegisteredInSupervisionDecider)" then the stream just 
> fails with that exception. Supervision strategy doesn't work here. Is it 
> supposed to or it won't work for ActorPublishers?
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> Björn Antonsson
> Typesafe Inc. <http://typesafe.com> – Reactive Apps on the JVM
> twitter: bantonsson <http://twitter.com/bantonsson>
>
> JOIN US. REGISTER TODAY! 
> <http://event.scaladays.org/scaladays-sanfran-2015>
> Scala <http://event.scaladays.org/scaladays-sanfran-2015>
> Days <http://event.scaladays.org/scaladays-sanfran-2015>
> March 16th-18th, <http://event.scaladays.org/scaladays-sanfran-2015>
> San Francisco <http://event.scaladays.org/scaladays-sanfran-2015>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Streams] Is supervision strategy supposed to work with ActorPublishers?

2015-03-03 Thread Jakub Liska
Hey,

when ActorPublisher does : 
"onError(exceptionRegisteredInSupervisionDecider)" then the stream just 
fails with that exception. Supervision strategy doesn't work here. Is it 
supposed to or it won't work for ActorPublishers?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Streams] Are there any limits on Stream merging?

2015-03-03 Thread Jakub Liska
Hey,

I'm trying to design a stream processing of hundreds of thousands of files 
row by row, reading files lazily. It comes with the obligation to close the 
InputStream at the end so that creating an ActorPublisher for each file 
that would close the underlying stream at the end seems to be the best 
idea. But the streams must be merged into a single one. My question is, can 
I do something like this for hundreds of thousands of files? Or is it a bad 
idea? I can't think of anything else right now. Thank you

Source[Row]() { implicit b =>
  val actorSources = files.map( file =>
Source(Props(classOf[BatchActor], file))
  ).toArray

  val merge = b.add(Merge[Row](actorSources.length))

  for (i <- 0 until actorSources.length) {
b.addEdge(b.add(actorSources(i)), merge.in(i))
  }

  merge.out
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Problem with supervision strategy & akka-stream 1.0-M4

2015-03-03 Thread Jakub Liska
I suspect it is this issue https://github.com/akka/akka/issues/16979

The root cause is just not printed out : 

Cause: java.lang.IllegalStateException: Processor actor terminated abruptly

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [Akka-stream] Distributed actor communication via Ask pattern or ActorPublisher/ActorSubscriber

2015-02-22 Thread Jakub Liska
Yeah, but as I said those are remote actors  So this is not an option. 
Not even ActorPublisher/Subscriber because that doesn't work remotely 
either...

LineSource ~> lineToRecord ~> indexRecord ~> bcast ~> sink.ignore
>  sink.ignore <~ responseHandler <~ bcast
>

I don't get this, you're broadcasting a stream to immediately ignore it? Or 
you mean something like this :

LineSource ~> lineToRecord ~> indexRecord ~> bcast ~> anotherHandler ~> 
sink.ignore

   bcast ~> responseHandler ~> sink.ignore

 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [Akka-stream] Distributed actor communication via Ask pattern or ActorPublisher/ActorSubscriber

2015-02-22 Thread Jakub Liska
I just found out that : 

ActorPublisher and ActorSubscriber *cannot be used with remote actors*, 
> because if signals of the Reactive Streams protocol (e.g. request) are lost 
> the the stream may deadlock.


but the same applies for https://github.com/akka/akka/issues/16416, right?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [Akka-stream] Distributed actor communication via Ask pattern or ActorPublisher/ActorSubscriber

2015-02-22 Thread Jakub Liska
Hi Jim,

thank you for brain dumping, it all makes sense. Since you pointed out that 
until https://github.com/akka/akka/issues/16416 

 there 
is no straightforward request/response solution (without handling back 
pressure yourself), how about using *ActorPublisher/ActorSubscriber* here? 
Documentation says it should work between cluster nodes and back-pressure 
would be built-in.

Although In my case the file producer is very very slow and theoretically 
it should never be as fast as consumer unless consumer restarts or 
something so that back-pressure is not much of an issue. 

Also you mentioned breaking thinks into little pieces. If you are 
constantly pulling files out of somewhere with scheduler and you have an 
indexer that is indexing input from a few other services, 
then you have one persistent system that lasts over all scheduler cycles 
and each tick means spawning a single batch Flow that downloads files and 
feeds them to indexer. Imho the correct solution  (not counting 
https://github.com/akka/akka/issues/16416 
)
 
might really be creating DownloadActorPublisher and IndexingActorSubscriber 
and hook them up for that single batch. 

What do you think? Jakub

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-stream] Distributed actor communication via Ask pattern or ActorPublisher/ActorSubscriber

2015-02-21 Thread Jakub Liska
Hi,

If ActorX needs to pass a load of data to be processed to a remote ActorY, 
is this a correct think to do?

Imagine the data is rows of huge files that are downloaded on ActorX and to 
be processed and indexed on ActorY.

trait IndexReq {
  def source: Source[Row]
  def flow: Flow[Row, Rec] // processing from a row to a record
}

trait IndexRes[T] {
  def source: Source[T]
}



So that ActorX loads 1 - x rows of the file as immutable.Iterable and does :

actorY ? IndexReq(Source(data)) 

and ActorY Materializes the Source and does :

source.grouped(Int.MaxValue).runWith(Sink.head) onComplete {
  case Success(results) => actorX ! IndexRes(Source(results))
}

Is this a correct solution to this problem? 

Or should actorX be ActorPublisher and actorY ActorSubscriber. But the 
distributed nature of it is not documented much and it'd be more of an 
experiment for me.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] FuncIterable & IterableSource is calling next concurrently

2015-02-21 Thread Jakub Liska
Hi, it was my fault, I finally understand the concept behind it, I can see 
what is running concurrently/sequentially now. Thank you

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] FuncIterable & IterableSource is calling next concurrently

2015-02-19 Thread Jakub Liska

>
> Yes, of course these sub-streams are running concurrently! If you don’t 
> want that, then why convert them to streams in the first place?
>

To be honest I have real troubles to tell when stuff is running 
concurrently and when it is synchronized. For instance in this stream, do I 
have to worry about thread-safety ?

Because I'm still noticing that non-thread-safe stuff in 
"loadDataFilesLazily" is still breaking sometimes.

Source(ftpClient.loadDataFilesLazily(logDir, mfRows)
  .groupBy { case (mfName, records) => mfName }
  .map { case (mfName, dataFileSource) =>
dataFileSource.mapConcat { case (mfn, dataFile) =>
  dataFile

}.via(flow).grouped(Int.MaxValue).runWith(Sink.head).map(validateMetaFileBatch(mfName,
 _)).recover {
  case ex: NoSuchElementException =>
metaCache.add(mfName)
Seq.empty[Try[R]]
}
  }.transform {
() => CompleteWith(() => {
  ftpClient.disconnect(client)
})

  } 
 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] FuncIterable & IterableSource is calling next concurrently

2015-02-18 Thread Jakub Liska
I'l try to reproduce later, but I'm quite sure of it because if I do 

val iterable = list.toStream.map( 
this.synchronize(non-thread-safe-operation) )

or 

source.map ( this.synchronize(...) )

All concurrent access problems are suddenly gone... Which seems to be an 
ultimate prove... 

Btw is it possible that it is caused by the fact that the Source itself is 
created in a map operation ?

Source(List(1,2,3).map {
Source(concurrentAccessIterable).map(...)
}.flatten(FlattenStrategy.concat)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] FuncIterable & IterableSource is calling next concurrently

2015-02-18 Thread Jakub Liska
Hey,

I'd need to do something like :

val iterable = list.toStream.map( non-thread-safe-operation )
val source = Source(iterable) // Source(() => iterable.iterator)
source.map (...)

but the problem is that there is a concurrent access to the iterator next 
method and it breaks non-thread-safe things the callback is using.

I tried to do it his way instead of Source.map ( 
this-should-not-run-concurrently-but-it-sometimes-is-even-if 
 max-input-buffer-size = 1 )

because I think that Source.map sometimes runs the function concurrently 
because it breaks non-thread-safe stuff like commons FTPClient too.

Btw by term "concurrently" I mean that multiple threads executes the 
callback at the same time.

I'm using akka-stream M3 version.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.