Re: [akka-user] [akka-stream] Decrease buffer size for some stages

2015-01-22 Thread Endre Varga
Hi Alexey,



On Thu, Jan 22, 2015 at 4:15 AM, Alexey Romanchuk <
alexey.romanc...@gmail.com> wrote:

> Hey!
>
> I have a stream that process incoming messages, assemble big message
> "pack" and send it to other system via network. Incoming messages are
> relatively small and I use big buffers to improve throughput for all stages
> related to these small messages. At the very end of stream I have "message
> collector" which aggregates messages and periodically produces message
> "packs" which must be sent via network.
>
> System looks like this:
>
> (Input) -> (Complex flow for small messages) -> (Aggregator) -> (Network
> sender)
>
> I increased buffer size to 128 to improve throughput, but the problem is
> in network sender. Network sender can resend data several times in case of
> network failure. I want to force only 1 element buffer before network
> sender to perform effective backpressure to input. In case of same size
> buffer size stream can be flooded with lot of huge aggregators packages.
>
> I know that there is "buffer" flow combinator, but it looks like it work
> only if I want to increase buffer size before some stage, but does not work
> for decreasing buffer size.
>

You can control the internal buffer size for stages as it is explained
here:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html#Internal_buffers_and_their_effect


   1. val flow =
   2. Flow[Int]
   3. .section(OperationAttributes.inputBuffer(initial = 1, max = 1)) {
   sectionFlow =>
   4. // the buffer size of this map is 1
   5. sectionFlow.map(_ * 2)
   6. }
   7. .map(_ / 2) // the buffer size of this map is the default


-Endre


>
> Could you advise right way to achieve this kind of backpressure?
>
> Thanks!
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka streams] question on some time related use cases

2015-01-22 Thread Endre Varga
Hi Frank,

On Thu, Jan 22, 2015 at 2:51 AM, Frank Sauer  wrote:

> Thanks, I came up with the following, but I have some questions:
>
> /**
>* Holds elements of type A for a given finite duration after a
> predicate p first yields true and as long as subsequent
>* elements matching that first element (e.g. are equal) still satisfy
> the predicate. If a matching element arrives during
>* the given FiniteDuration for which the predicate p does not hold, the
> original element will NOT be pushed downstream.
>* Only when the timer expires and no matching elements have been seen
> for which p does not hold, will elem be pushed
>* downstream.
>*
>* @param duration The polling interval during which p has to hold true
>* @param pThe predicate that has to remain true during the
> duration
>* @param system   implicit required to schedule timers
>* @tparam A   type of the elements
>*/
>   class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit
> system: ActorSystem) extends PushStage[A,A] {
>
> var state : Map[A,Cancellable] = Map.empty
>
> override def onPush(elem: A, ctx: Context[A]): Directive =
> state.get(elem) match {
>
>   case Some(timer) if !p(elem) => // pending timer but condition no
> longer holds => cancel timer
>  timer.cancel()
>  state = state - elem
>  ctx.pull()
>
>case None if p(elem) => // no pending timer and predicate true ->
> start and cache new timer
>  val timer = system.scheduler.scheduleOnce(duration) {
>// when timer fires, remove from state and push elem downstream
>state = state - elem
>ctx.push(elem); // is this safe?
>  }
>  state = state + (elem -> timer)
>  ctx.pull()
>
>case _ => ctx.pull() // otherwise simply wait for the next upstream
> element
> }
>
>   }
>
> My main concerns are these:
>
> 1) Is it safe to invoke ctx.push from the thread on which the timer fires?
>

No, it is absolutely forbidden. The golden rule for stages is, that in a
handler:
 - *Exactly one* method should be called on the
 - *currently* passed Context
 - *exactly once*
 - *as the last statement* in the handler
 - *with the type matching* the expected return type of the handler

The only exceptions are isHolding and isFinished because they are query
methods.

Calling any of these methods externally will not work, because the context
is not thread-safe, and it violates the rules above.

You can approximate the behavior you want by instead of firing a timer, you
just record the time of the first occurence of the event and then you check
the elapsed time whenever a new incoming element arrives. Obviously this
would only work if there are enough elements flowing, but you can inject
some Filler elements easily, you can take this recipe and modify it to fit
your needs:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Injecting_keep-alive_messages_into_a_stream_of_ByteStrings

We will have more flexible tools though to handle timers in the future.

If you don't require your alerts to be a Stream itself, then you can
alternatively use an actor and "ask" to process the events by using:

   myEvents.mapAsync(alertingActor ? ev)

The actor needs to reply to the incoming events so the stream continues to
be pulled. The actor is free to schedule timers however it wants and fire
alerts whenever it wants. Please note that events should be sequenced
because mapAsync fires multiple asks parallelly. You can simply add a
sequence number adding stage before the mapAsync if you don't have these on
the events.



2) How do I react to upstream or downstream finish or cancel events - do I
> have to?
>

No, only if you want to do something special as a response for those
events. Otherwise the default behavior is just to shut down the stage and
propagate the termination signal.

-Endre


> 3) Can I integrate this into the DSL without using transform, e.g. can I
> somehow add a filterFor method on something via a pimp my library?
>
> Any and all pointers would be very much appreciated,
>
> Thanks,
>
> Frank
>
> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
>>
>> Hi Frank!
>> We do not have such operations off-the-shelf, however they are easily
>> implementable by using custom stream processing stages:
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>> 1.0-M2/scala/stream-customize.html
>>
>> Be sure to refer to the cookbook for some inspiration on how to implement
>> your own stages:
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>> 1.0-M2/scala/stream-cookbook.html
>>
>> Hope this helps, and feel free to ask for help in case you get stuck :-)
>>
>> --
>> Konrad
>>
>> On Thu, Jan 15, 2015 at 3:57 AM, Frank Sauer  wrote:
>>
>>> I have two uses cases that I'm used to from using CEP systems like Esper
>>> and I'm trying to figure out if I can implements

Re: [akka-user] [akka streams] question on some time related use cases

2015-01-22 Thread Endre Varga
On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer  wrote:

> Update, in a simple test scenario like so
>
>   val ticks = Source(1 second, 1 second, () => "Hello")
>
>   val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x =>
> true)).to(Sink.foreach(println(_)))
>
>   flow.run()
>
> I'm seeing the following error, so this doesn't work at all and I'm not
> sure it is because of threading:
>
> java.lang.ArrayIndexOutOfBoundsException: -1
> at
> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
> at
> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
> at
> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
> at
> experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I think I'm violating the one very important rule mentioned in the docs -
> when the timer fires it calls a push on the context but there is also a
> pull going on concurrently(?) - and this is indeed breaking in spectacular
> ways as expected
>

:)


>
> I have no idea how to implement this correctly. It looked pretty simple at
> first, but alas...
>

See my previous mail. The main problem here is mixing backpressured streams
(your data) and non-backpressured events (timer triggers) in a safe
fashion. Well, the main problem is not how to implement it, but how to
expose an API to users which is as safe as possible. We have groupedWithin,
takeWithin and dropWithin as timer based ops, but no customization for now.

-Endre


>
> On Wednesday, January 21, 2015 at 8:51:21 PM UTC-5, Frank Sauer wrote:
>>
>> Thanks, I came up with the following, but I have some questions:
>>
>> /**
>>* Holds elements of type A for a given finite duration after a
>> predicate p first yields true and as long as subsequent
>>* elements matching that first element (e.g. are equal) still satisfy
>> the predicate. If a matching element arrives during
>>* the given FiniteDuration for which the predicate p does not hold,
>> the original element will NOT be pushed downstream.
>>* Only when the timer expires and no matching elements have been seen
>> for which p does not hold, will elem be pushed
>>* downstream.
>>*
>>* @param duration The polling interval during which p has to hold true
>>* @param pThe predicate that has to remain true during the
>> duration
>>* @param system   implicit required to schedule timers
>>* @tparam A   type of the elements
>>*/
>>   class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit
>> system: ActorSystem) extends PushStage[A,A] {
>>
>> var state : Map[A,Cancellable] = Map.empty
>>
>> override def onPush(elem: A, ctx: Context[A]): Directive =
>> state.get(elem) match {
>>
>>   case Some(timer) if !p(elem) => // pending timer but condition no
>> longer holds => cancel timer
>>  timer.cancel()
>>  state = state - elem
>>  ctx.pull()
>>
>>case None if p(elem) => // no pending timer and predicate true ->
>> start and cache new timer
>>  val timer = system.scheduler.scheduleOnce(duration) {
>>// when timer fires, remove from state and push elem downstream
>>state = state - elem
>>ctx.push(elem); // is this safe?
>>  }
>>  state = state + (elem -> timer)
>>  ctx.pull()
>>
>>case _ => ctx.pull() // otherwise simply wait for the next
>> upstream element
>> }
>>
>>   }
>>
>> My main concerns are these:
>>
>> 1) Is it safe to invoke ctx.push from the thread on which the timer fires?
>> 2) How do I react to upstream or downstream finish or cancel events - do
>> I have to?
>> 3) Can I integrate this into the DSL without using transform, e.g. can I
>> somehow add a filterFor method on something via a pimp my library?
>>
>> Any and all pointers would be very much appreciated,
>>
>> Thanks,
>>
>> Frank
>>
>> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
>>>
>>> Hi Frank!
>>> We do not have such operations off-the-shelf, however they are easily
>>> implementable by using custom stream processing stages:
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>>> 1.0-M2/scala/stream-customize.html
>>>
>>> Be sure to refer to the cookbook for some inspiration on how to
>>> implement your own stages:
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/
>>> 1.0-M2/scala/st

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Johannes Berg
Okay, I increased the load further and now I see the same problem again. It 
seems to just have gotten a bit better in that it doesn't happen as fast, 
but with enough load it happens.

To re-iterate, I have Akka 2.3.9 on all (8) nodes and 
auto-down-unreachable-after = off on all nodes and I don't do any manual 
downing anywhere, still the leader log prints this:

2015-01-22 10:35:37 + - [INFO] - from Cluster(akka://system) in 
system-akka.actor.default-dispatcher-2 
Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing unreachable 
node [akka.tcp://system@ip2:port2]

and the node(s) under load is(are) removed from the cluster (quarantined). 
How is this possible?

On Wednesday, January 21, 2015 at 5:53:06 PM UTC+2, drewhk wrote:
>
> Hi Johannes,
>
> See the milestone here: 
> https://github.com/akka/akka/issues?q=milestone%3A2.3.9+is%3Aclosed
>
> The tickets cross reference the PRs, too, so you can look at the code 
> changes. The issue that probably hit you is 
> https://github.com/akka/akka/issues/16623 which manifested as system 
> message delivery errors on some systems, but actually was caused by 
> accidentally duplicated internal actors (a regression).
>
> -Endre
>
> On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg  > wrote:
>
>> Upgrading to 2.3.9 does indeed seem to solve my problem. At least I 
>> haven't experienced them yet.
>>
>> Now I'm curious what the fixes were, is there somewhere a change summary 
>> between versions or where is it listed what bugs have been fixed in which 
>> versions?
>>
>> On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote:
>>>
>>> Hi Johannes,
>>>
>>> We just released 2.3.9 with important bugfixes. I recommend to update 
>>> and see if the problem is still persisting.
>>>
>>> -Endre
>>>
>>> On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg  
>>> wrote:
>>>
 Many connections seem to be formed in the case when the node has been 
 marked down for unreachability even though it's still alive and it tries 
 to 
 connect back into the cluster. The removed node prints:

 "Address is now gated for 5000 ms, all messages to this address will be 
 delivered to dead letters. Reason: The remote system has quarantined this 
 system. No further associations to the remote system are possible until 
 this system is restarted."

 It doesn't seem to close the connections properly even though it opens 
 new ones continously.

 Anyway that's a separate issue that I'm not that concerned about right 
 now, I've now realized I don't want to use automatic downing instead I 
 would like to allow nodes to go unreachable and come back to reachable 
 even 
 if it takes quite some time and manually stopping the process and downing 
 the node in case of an actual crash.

 Consequently I've put

 auto-down-unreachable-after = off

 in the config. Now I have the problem that nodes still are removed, 
 this is from the leader node log:

 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4] 
 Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1] - 
 Leader is removing unreachable node [akka.tcp://system@ip2:port2]

 I can understand my node is marked unreachable beause it's under heavy 
 load but I don't understand what could cause it to be removed. I'm not 
 doing any manual downing and have the auto-down to off, what else could 
 trigger the removal?

 Using the akka-cluster script I can see that the node has most other 
 nodes marked as unreachable (including the leader) and that it has another 
 leader than other nodes.

 My test system consists of 8 nodes.

 About the unreachability I'm not having long GC pauses and not sending 
 large blobs, but I'm sending very many smaller messages as fast as I can. 
 If I just hammer it fast enough it will end up unreachable which I can 
 except, but I need to get it back to reachable.

 On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson 
 wrote:
>
> Hi Johannes,
>
> On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com) 
> wrote:
>
> Hi! I'm doing some load tests in our system and getting problems that 
> some of my nodes are marked as unreachable even though the processes are 
> up. I'm seeing it going a few times from reachable to unreachable and 
> back 
> a few times before staying unreachable saying connection gated for 5000ms 
> and staying silently that way.
>
> Looking at the connections made to one of the seed nodes I see that I 
> have several hundreds of connections from other nodes except the failing 
> ones. Is this normal? There are several (hundreds) just between two 
> nodes. 
> When are connections formed between cluster nodes and when are they taken 
> down?
>
>
> Several hundred connec

[akka-user] Running with akka-remote: ClassNotFound

2015-01-22 Thread Ashesh Ambasta
The setup is quite basic:

Here's my application.conf
akka {
  actor {
provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2552
}
  }
}


And my build.sbt;
name := "random-app"


version := "1.0"


scalaVersion := "2.11.5"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.3.8",
  "com.typesafe.akka" %% "akka-remote" % "2.3.8",
  "centralapp-core" %% "centralapp-core" % "1.0",
  "io.spray" %% "spray-can" % "1.3.2",
  "io.spray" %% "spray-json" % "1.3.2",
  "io.spray" %% "spray-client" % "1.3.2"
)


However, when trying to run the application, I get the following exception:
Exception in thread "main" java.lang.ClassNotFoundException: akka.remote.
RemoteActorRefProvider
Why?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] received Supervise from unregistered child ... this will not end well

2015-01-22 Thread Marco Luca Sbodio
Hi Viktor,

after upgrading to akka 2.3.9 my multi-jvm-test crashes with this error:

10:49:07.654UTC ERROR 
[MySystemMultiNodeTest-akka.actor.default-dispatcher-21] 
[akka.actor.ActorSystemImpl] [ActorSystem(MySystemMultiNodeTest)] - 
Uncaught error from thread 
[MySystemMultiNodeTest-akka.actor.default-dispatcher-21] shutting down JVM 
since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError: akka/actor/Actor.aroundPreStart()V
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.10-2.3.9.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.10-2.3.9.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.9.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) 
[akka-actor_2.10-2.3.9.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
[akka-actor_2.10-2.3.9.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
[akka-actor_2.10-2.3.9.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
[scala-library-2.10.4.jar:na]

Has something change with multi-jvm/multi-node testing from akka 2.2.4 to 
akka 2.3.9?
Any clue?

Thank you in advance,
Marco

On Wednesday, 21 January 2015 19:15:51 UTC, √ wrote:
>
> Hi Marco,
>
> Please upgrade to 2.3.9 if you haven't already, there was a couple of 
> remoting-related issues fixed there.
>
> On Wed, Jan 21, 2015 at 6:45 PM, Marco Luca Sbodio  > wrote:
>
>> I haven't.
>>
>> I've managed to figure out that sometimes the following code
>>
>> [[
>> int nextStepNumber = planSteps[0].getStepNumber();
>> Address nextAddress = planSteps[0].getPeer().getAddress();
>> PlanStep[] nextPlanSteps = new PlanStep[planSteps.length];
>> System.arraycopy(planSteps, 0, nextPlanSteps, 0, 
>> planSteps.length);
>> firstWorker = getContext().actorOf(
>> Worker.mkProps(sink, 
>> nextPlanSteps).withDeploy(new Deploy(new RemoteScope(nextAddress))),
>> Worker.actorName + nextStepNumber);
>> firstWorker.tell(CommonApi.START, getSelf());
>> log.debug("started first worker");
>> Chunk chunk = new Chunk(0, new byte[] {});
>> firstWorker.tell(chunk, getSelf());
>> log.debug("empty chunk sent to first worker");
>> ]]
>>
>> generate such an error:
>>
>> [[
>> 17:20:08.195UTC ERROR 
>> [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] 
>> [org.mysystem.actor.SubPlanner] 
>> [akka://MySystemMultiNodeTest/user/the-engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/planner/sub-planner-11]
>>  
>> - received Supervise from unregistered child Actor[akka.tcp://
>> mysystemmultinodet...@nd06.domain.com:3002/remote/akka.tcp/mysystemmultinodet...@nd03.domain.com:3001/user/the-engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/planner/sub-planner-11/worker0#-1680645824],
>>  
>> this will not end well
>> ]]
>>
>> tracing back "sub-planner-11" in my logs I find the two messages
>> log.debug("started first worker");
>> firstWorker.tell(chunk, getSelf());
>>
>> and then I get the error ... and I have no clue why
>>
>> Thank you in advance for any help/suggestion.
>>
>> Cheers,
>> Marco
>>
>> On Tuesday, 20 January 2015 18:12:08 UTC, √ wrote:
>>>
>>> Have you closed over "context.actorOf" and execute it within a Future or 
>>> similar?
>>>
>>> On Tue, Jan 20, 2015 at 6:47 PM, Marco Luca Sbodio  
>>> wrote:
>>>
 Hello everyone,

 while testing my system I'm randomly getting error messages similar to 
 the one in the subject of this topic. Here's an example:

 17:27:59.265UTC ERROR 
 [MySystemMultiNodeTest-akka.actor.default-dispatcher-16] 
 [org.mysystem.actor.Worker] [akka://MySystemMultiNodeTest/
 user/the-engine/executor-7b7690ee-f31d-45f1-93ef-
 79cba01fe604/planner/sub-planner-951/worker0] - received Supervise 
 from unregistered child Actor[akka.tcp://MySystemMulti
 nodet...@nd06.domain.com:3002/remote/akka.tcp/
 mysystemmultinodet...@nd03.domain.com:3001/user/the-
 engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe604/
 planner/sub-planner-951/worker0/worker1.4#-430862452], this will not 
 end well

 I really have no clue what might cause these errors, and what the 
 consequences are ("this will not end well"). I've tried searching on the 
 Web, but didn't find anything that helped me.

 I'm using akka 2.2.3

 Any help is highly appre

Re: [akka-user] [akka-stream] Decrease buffer size for some stages

2015-01-22 Thread Alexey Romanchuk
It is exactly what I have tried to achieve! You guys did amazing work with 
all akka streams. Thanks! :)

четверг, 22 января 2015 г., 14:43:52 UTC+6 пользователь drewhk написал:
>
> Hi Alexey,
>
>
>
> On Thu, Jan 22, 2015 at 4:15 AM, Alexey Romanchuk  > wrote:
>
>> Hey!
>>
>> I have a stream that process incoming messages, assemble big message 
>> "pack" and send it to other system via network. Incoming messages are 
>> relatively small and I use big buffers to improve throughput for all stages 
>> related to these small messages. At the very end of stream I have "message 
>> collector" which aggregates messages and periodically produces message 
>> "packs" which must be sent via network.
>>
>> System looks like this:
>>
>> (Input) -> (Complex flow for small messages) -> (Aggregator) -> (Network 
>> sender)
>>
>> I increased buffer size to 128 to improve throughput, but the problem is 
>> in network sender. Network sender can resend data several times in case of 
>> network failure. I want to force only 1 element buffer before network 
>> sender to perform effective backpressure to input. In case of same size 
>> buffer size stream can be flooded with lot of huge aggregators packages.
>>
>> I know that there is "buffer" flow combinator, but it looks like it work 
>> only if I want to increase buffer size before some stage, but does not work 
>> for decreasing buffer size.
>>
>
> You can control the internal buffer size for stages as it is explained 
> here: 
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html#Internal_buffers_and_their_effect
>
>
>1. val flow =
>2. Flow[Int]
>3. .section(OperationAttributes.inputBuffer(initial = 1, max = 1)) { 
>sectionFlow =>
>4. // the buffer size of this map is 1
>5. sectionFlow.map(_ * 2)
>6. }
>7. .map(_ / 2) // the buffer size of this map is the default
>
>
> -Endre
>  
>
>>
>> Could you advise right way to achieve this kind of backpressure?
>>
>> Thanks!
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Decrease buffer size for some stages

2015-01-22 Thread Endre Varga
Hi Alexey,

On Thu, Jan 22, 2015 at 12:31 PM, Alexey Romanchuk <
alexey.romanc...@gmail.com> wrote:

> It is exactly what I have tried to achieve! You guys did amazing work with
> all akka streams. Thanks! :)
>

You might be also interested in this ticket:
https://github.com/akka/akka/issues/16610

It is about a batcher element that is a custom DetachedStage that
aggregates elements while:
 - the downstream is backpressuring
 - a certain batch limit is not yet reached

There are currently the following alternatives:
 - grouped(N), but it always waits for N elements (except when stream
terminates)
 - groupedWithin(T), it always batches elements in the time-window T, but
has no cap, and always waits for that time even if downstream is ready to
consume
 - conflate, which always batches elements while downstream is
backpressuring, but has no capacity limit (never backpressures upstream)

The ticket I linked is to basically have a conflate variant that can
backpressure the upstream if batch size limit is reached (similar to what
buffer does).

-Endre


>
> четверг, 22 января 2015 г., 14:43:52 UTC+6 пользователь drewhk написал:
>>
>> Hi Alexey,
>>
>>
>>
>> On Thu, Jan 22, 2015 at 4:15 AM, Alexey Romanchuk 
>> wrote:
>>
>>> Hey!
>>>
>>> I have a stream that process incoming messages, assemble big message
>>> "pack" and send it to other system via network. Incoming messages are
>>> relatively small and I use big buffers to improve throughput for all stages
>>> related to these small messages. At the very end of stream I have "message
>>> collector" which aggregates messages and periodically produces message
>>> "packs" which must be sent via network.
>>>
>>> System looks like this:
>>>
>>> (Input) -> (Complex flow for small messages) -> (Aggregator) -> (Network
>>> sender)
>>>
>>> I increased buffer size to 128 to improve throughput, but the problem is
>>> in network sender. Network sender can resend data several times in case of
>>> network failure. I want to force only 1 element buffer before network
>>> sender to perform effective backpressure to input. In case of same size
>>> buffer size stream can be flooded with lot of huge aggregators packages.
>>>
>>> I know that there is "buffer" flow combinator, but it looks like it work
>>> only if I want to increase buffer size before some stage, but does not work
>>> for decreasing buffer size.
>>>
>>
>> You can control the internal buffer size for stages as it is explained
>> here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-rate.html#Internal_
>> buffers_and_their_effect
>>
>>
>>1. val flow =
>>2. Flow[Int]
>>3. .section(OperationAttributes.inputBuffer(initial = 1, max = 1)) {
>>sectionFlow =>
>>4. // the buffer size of this map is 1
>>5. sectionFlow.map(_ * 2)
>>6. }
>>7. .map(_ / 2) // the buffer size of this map is the default
>>
>>
>> -Endre
>>
>>
>>>
>>> Could you advise right way to achieve this kind of backpressure?
>>>
>>> Thanks!
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>  --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] BalancingPool versus RoundRobinPool + BalancingDispatcher

2015-01-22 Thread Roland Kuhn
Hi Jean,

> 20 jan 2015 kl. 15:26 skrev Jean Helou :
> 
> Hello Roland, 
>  
> thanks for taking the time to answer. 
> 
> your description very much sounds like an Actor should be handling the pool, 
> not a router.
> 
> Alright, however I am still curious as to why the BalancingPool doesn't 
> handle resize and why RoundRobinPool + BalancingDispatcher is deprecated. 

The BalancingDispatcher is a very particular setup: a thread pool of size N 
with N actors that all pull from the same queue. Thread pool sizes are not 
changeable at runtime in Akka.

>  
> You can use the DefaultResizer stand-alone (for determining what to do) and 
> you can also use the RoundRobinRoutingLogic stand-alone (for determining 
> where to send to); then the only missing piece is to place these inside an 
> Actor that will then start new Actors (easy) or stop Actors in a coordinated 
> fashion (straight-forward: just send a termination request and take it out of 
> the list of routees you pass into the RoutingLogic).
> 
>  starting & stopping actors is indeed easy, using the RoundRobinRoutingLogic 
> is fairly straightforward. Implementing the management messages is starting 
> to be a bit of a bother. Using DefaultResizer for anything else than a fixed 
> size actor based pool is really not trivial.
> 
> As I said in my introduction 
>  I experimented with a fixed size pool but ultimately I would prefer an 
> elastic pool able to spawn despawn actors depending on request pressure
> 
> Hopefully the following will fill the fixed size pool with a round robin 
> behavior , an enforced fixed size  and can be safely combined with 
> routeeProps having a BalancingDispatcher.

As I said BalancingDispatcher has a statically configured number of threads. 
Best performance is achieved by matching this number to the number of actors, 
which avoids (most of) the thread hopping that would otherwise occur.

> 
> package actors.browser
>  
> import akka.actor.{Terminated, Props, ActorLogging, Actor}
> import akka.routing._
>  
> import scala.collection.immutable.IndexedSeq
>  
> class RoundRobinFixedSizeActorPool(routeeProps:Props, nbActors:Int) extends 
> Actor with ActorLogging{
>   val resizer = DefaultResizer(nbActors, nbActors)
>   val routingLogic = RoundRobinRoutingLogic()
>  
>   var routees: IndexedSeq[Routee] = (1 until resizer.lowerBound).map{_=>
> val routee = context.actorOf(routeeProps)
> context.watch(routee)
> ActorRefRoutee(routee)
>   }
>  
>   override def receive: Receive = {
> case Terminated(actor) if routees.contains(ActorRefRoutee(actor))=>
>   routees=routees.filterNot(_ == ActorRefRoutee(actor))
>   val routee = context.actorOf(routeeProps)
>   context.watch(routee)
>   routees=routees :+ ActorRefRoutee(routee)
> case GetRoutees =>
>   sender() ! Routees(routees)
> case evt => routingLogic.select(evt, routees).send(evt,sender())
>   }
> }
> object RoundRobinFixedSizeActorPool{
>   def props(props:Props, nbActors:Int)=Props(new 
> RoundRobinFixedSizeActorPool(props, nbActors))
> }
> 
> How would you recommend going from there to a pressure sensitive actor pool 
> oscillating between a min and a max ? 
> 
> Does the following look correct (I am unsure how to write tests for this ... 
> ) ?
> 
> package actors.browser
>  
> import java.util.concurrent.atomic.AtomicLong
>  
> import akka.actor._
> import akka.routing._
>  
> import scala.collection.immutable.IndexedSeq
>  
> class RoundRobinActorPool(routeeProps: Props, minRoutees: Int, maxRoutees: 
> Int) extends Actor with ActorLogging {
>   val resizer: Resizer = DefaultResizer(minRoutees, maxRoutees)
>   val nbMsgs = new AtomicLong()

This should be a simple Long, no need to be atomic within an Actor.

>   val routingLogic = RoundRobinRoutingLogic()
>  
>   var routees: IndexedSeq[Routee] = IndexedSeq()
>   upsize(minRoutees)
>  
>   override def receive: Receive = {
> case Terminated(actor) if routees.contains(ActorRefRoutee(actor)) =>
>   routees = routees.filterNot(_ == ActorRefRoutee(actor))

The unhandled Terminated messages will then go to the routees which is probably 
not intended.

> case GetRoutees =>
>   sender() ! Routees(routees)
> case evt =>
>   if (resizer.isTimeForResize(nbMsgs.incrementAndGet())) {
> resizer.resize(routees)
>   }

This should move into the resize() method below (which should also be called).

>  
>   routingLogic.select(evt, routees).send(evt, sender())
>   }
>  
>   private def resize(delta: Int) = {
> if (delta < 0) downsize(delta)
> else upsize(delta)
>   }
>  
>   private def downsize(delta: Int) = {
> val (killed, remaining) = this.routees.splitAt(delta)
> this.routees = remaining
> killed.foreach(_.send(PoisonPill, self))
>   }
>  
>   private def upsize(delta: Int) = {
> routees = routees ++ (1 until minRoutees).map { _ =>
>   val routee = context.actorOf(routeeProps)
>   context.watch(routee)
>   ActorRefR

Re: [akka-user] Dispatcher assignment issue for Http Server

2015-01-22 Thread Roland Kuhn
Hi Randy,

without setting max-pool-size-max (which defaults to 64) you will not be able 
to see more than 64 threads. May I ask why you want to create 100 threads for 
handling a single stream of incoming connections? I would expect at least 99 of 
them to be idle at all times. If you want to perform blocking operations in the 
flows that handle each of those connections then I’d recommend to use a 
dedicated dispatcher for those.

Regards,

Roland

> 20 jan 2015 kl. 17:21 skrev Randy Fox :
> 
> Nope.  Did I find a bug or am i doing something wrong?
> 
> On Saturday, January 17, 2015 at 1:21:12 PM UTC-8, √ wrote:
> Alright. Did you manage to sort it out?
> 
> -- 
> Cheers,
> √
> 
> On 16 Jan 2015 16:56, "Randy Fox" > wrote:
> I expected the thread pool to be pegged at 100 threads, but watching in 
> visual vm showed 24 threads (default).  Poking around the object structure in 
> the debugger, I found nested deep under the flowmaterializer was a maxsize of 
> 64 (also default).
> 
> -r
> 
> On Thursday, January 15, 2015 at 5:34:05 PM UTC-8, Randy Fox wrote:
> I am trying to assign a dispatcher to be used for my http server (using the 
> new Akka HTTP Server) and can’t get it to honor my pool size settings.  Logs 
> show it is using the dispatcher, but visualvm shows it is not using the core 
> pool size settings.   Loos like it might be using the defaults for a 
> thread-pool-executor.
> 
> 
> 
> All I did was modify the example:
> 
> import akka.http.Http
> import akka.stream.FlowMaterializer
>  
> implicit val system = ActorSystem()
> implicit val materializer = 
> FlowMaterializer(MaterializerSettings(system).withDispatcher(“myhttpRequestHandler.dispatcher"))
>  
> val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
> serverBinding.connections.foreach { connection => // foreach materializes the 
> source
>   println("Accepted new connection from " + connection.remoteAddress)
> }
> …
> myhttpRequestHandler {
> 
>   dispatcher {
> 
> type = Dispatcher
> 
> executor = "thread-pool-executor"
> 
> name = httprequesthandler-dispatcher
> 
> thread-pool-executor {
> 
>   core-pool-size-min = 100
> 
>   core-pool-size-factor = 2.0
> 
>   core-pool-size-max = 100
> 
> }
> 
> throughput = 5
> 
>   }
> 
> }
> 
> [INFO] [2015-01-15 17:24:27,516] [DUDE-myhttpRequestHandler.dispatcher-79] 
> HttpRequestHandler(akka://DUDE): Accepted new connection from 
> /127.0.0.1:54046 
> What am I missing?
> 
> Thanks,
> 
> Randy Fox
> 
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com .
> Visit this group at http://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> .
> To post to this group, send email to akka-user@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .



Dr. Roland Kuhn
Akka Tech Lead
Typesafe  – Reactive apps on the JVM.
twitter: @rolandkuhn
 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an ema

Re: [akka-user] Handling boundary data in chunked requests

2015-01-22 Thread Roland Kuhn
If I understand correctly then you’ll want to use akka-http which has support 
for handling multipart/formdata. This has the downside that akka-http is still 
being developed, but you can try out milestone 1.0-M2 to see if that works for 
you.

Regards,

Roland

> 21 jan 2015 kl. 01:52 skrev Yogesh Pandit :
> 
> ​
> Hello,
> 
> I am using spray-can 1.2.1 and have a service to upload data in chunks.
> 
> It seems to be working fine but for the following boundary data being written 
> to the uploaded file. How can I not write this into the file. Reading the 
> whole file is not an option due to large size.
> I have used the following example - 
> https://github.com/spray/spray/tree/master/examples/spray-can/simple-http-server/src/main/scala/spray/examples
> 
> 
> --WebKitFormBoundary5mkhOpb3GjmIWL7a
> Content-Disposition: form-data; name="datafile"; filename="file.csv"
> Content-Type: text/csv
> 
> 
> 
> --WebKitFormBoundary5mkhOpb3GjmIWL7a--
> 
> Thanks you,
> -Yogesh
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> .
> To post to this group, send email to akka-user@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .



Dr. Roland Kuhn
Akka Tech Lead
Typesafe  – Reactive apps on the JVM.
twitter: @rolandkuhn
 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] received Supervise from unregistered child ... this will not end well

2015-01-22 Thread Viktor Klang
Hi Marco,

you'll need to update all Akka dependencies to the 2.3.9 version and make
sure that your dependencies that depend on akka transitively are built for
Akka 2.3.x

On Thu, Jan 22, 2015 at 11:57 AM, Marco Luca Sbodio 
wrote:

> Hi Viktor,
>
> after upgrading to akka 2.3.9 my multi-jvm-test crashes with this error:
>
> 10:49:07.654UTC ERROR
> [MySystemMultiNodeTest-akka.actor.default-dispatcher-21]
> [akka.actor.ActorSystemImpl] [ActorSystem(MySystemMultiNodeTest)] -
> Uncaught error from thread
> [MySystemMultiNodeTest-akka.actor.default-dispatcher-21] shutting down JVM
> since 'akka.jvm-exit-on-fatal-error' is enabled
> java.lang.AbstractMethodError: akka/actor/Actor.aroundPreStart()V
> at akka.actor.ActorCell.create(ActorCell.scala:580)
> ~[akka-actor_2.10-2.3.9.jar:na]
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> ~[akka-actor_2.10-2.3.9.jar:na]
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> ~[akka-actor_2.10-2.3.9.jar:na]
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> [akka-actor_2.10-2.3.9.jar:na]
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> [akka-actor_2.10-2.3.9.jar:na]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> [akka-actor_2.10-2.3.9.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [scala-library-2.10.4.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [scala-library-2.10.4.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.10.4.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.10.4.jar:na]
>
> Has something change with multi-jvm/multi-node testing from akka 2.2.4 to
> akka 2.3.9?
> Any clue?
>
> Thank you in advance,
> Marco
>
> On Wednesday, 21 January 2015 19:15:51 UTC, √ wrote:
>>
>> Hi Marco,
>>
>> Please upgrade to 2.3.9 if you haven't already, there was a couple of
>> remoting-related issues fixed there.
>>
>> On Wed, Jan 21, 2015 at 6:45 PM, Marco Luca Sbodio 
>> wrote:
>>
>>> I haven't.
>>>
>>> I've managed to figure out that sometimes the following code
>>>
>>> [[
>>> int nextStepNumber = planSteps[0].getStepNumber();
>>> Address nextAddress = planSteps[0].getPeer().
>>> getAddress();
>>> PlanStep[] nextPlanSteps = new
>>> PlanStep[planSteps.length];
>>> System.arraycopy(planSteps, 0, nextPlanSteps, 0,
>>> planSteps.length);
>>> firstWorker = getContext().actorOf(
>>> Worker.mkProps(sink,
>>> nextPlanSteps).withDeploy(new Deploy(new RemoteScope(nextAddress))),
>>> Worker.actorName + nextStepNumber);
>>> firstWorker.tell(CommonApi.START, getSelf());
>>> log.debug("started first worker");
>>> Chunk chunk = new Chunk(0, new byte[] {});
>>>
>>> firstWorker.tell(chunk, getSelf());
>>> log.debug("empty chunk sent to first worker");
>>> ]]
>>>
>>> generate such an error:
>>>
>>> [[
>>> 17:20:08.195UTC ERROR 
>>> [MySystemMultiNodeTest-akka.actor.default-dispatcher-16]
>>> [org.mysystem.actor.SubPlanner] [akka://MySystemMultiNodeTest/
>>> user/the-engine/executor-6eb81c68-e91d-47cd-bb26-
>>> be53eee90f63/planner/sub-planner-11] - received Supervise from
>>> unregistered child Actor[akka.tcp://MySystemMulti
>>> nodet...@nd06.domain.com:3002/remote/akka.tcp/
>>> mysystemmultinodet...@nd03.domain.com:3001/user/the-
>>> engine/executor-6eb81c68-e91d-47cd-bb26-be53eee90f63/
>>> planner/sub-planner-11/worker0#-1680645824], this will not end well
>>> ]]
>>>
>>> tracing back "sub-planner-11" in my logs I find the two messages
>>> log.debug("started first worker");
>>> firstWorker.tell(chunk, getSelf());
>>>
>>> and then I get the error ... and I have no clue why
>>>
>>> Thank you in advance for any help/suggestion.
>>>
>>> Cheers,
>>> Marco
>>>
>>> On Tuesday, 20 January 2015 18:12:08 UTC, √ wrote:

 Have you closed over "context.actorOf" and execute it within a Future
 or similar?

 On Tue, Jan 20, 2015 at 6:47 PM, Marco Luca Sbodio >>> > wrote:

> Hello everyone,
>
> while testing my system I'm randomly getting error messages similar to
> the one in the subject of this topic. Here's an example:
>
> 17:27:59.265UTC ERROR 
> [MySystemMultiNodeTest-akka.actor.default-dispatcher-16]
> [org.mysystem.actor.Worker] [akka://MySystemMultiNodeTest/
> user/the-engine/executor-7b7690ee-f31d-45f1-93ef-79cba01fe60
> 4/planner/sub-planner-951/worker0] - received Supervise from
> unregistered child Actor[akka.tcp://MySystemMulti
> nodet...@nd06.domain.com:3002/remote/akka.tcp/MySystemMultiN
> odet...@nd03.domain.com:3001/user/the-engine/executor-7b7690ee-f31d-
> 45f1-93ef-79cba01fe60

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Viktor Klang
Endre, could it be due to pending-to-send system message overflow?

On Thu, Jan 22, 2015 at 11:45 AM, Johannes Berg  wrote:

> Okay, I increased the load further and now I see the same problem again.
> It seems to just have gotten a bit better in that it doesn't happen as
> fast, but with enough load it happens.
>
> To re-iterate, I have Akka 2.3.9 on all (8) nodes and
> auto-down-unreachable-after = off on all nodes and I don't do any manual
> downing anywhere, still the leader log prints this:
>
> 2015-01-22 10:35:37 + - [INFO] - from Cluster(akka://system) in
> system-akka.actor.default-dispatcher-2
> Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing
> unreachable node [akka.tcp://system@ip2:port2]
>
> and the node(s) under load is(are) removed from the cluster (quarantined).
> How is this possible?
>
> On Wednesday, January 21, 2015 at 5:53:06 PM UTC+2, drewhk wrote:
>>
>> Hi Johannes,
>>
>> See the milestone here: https://github.com/akka/
>> akka/issues?q=milestone%3A2.3.9+is%3Aclosed
>>
>> The tickets cross reference the PRs, too, so you can look at the code
>> changes. The issue that probably hit you is https://github.com/akka/
>> akka/issues/16623 which manifested as system message delivery errors on
>> some systems, but actually was caused by accidentally duplicated internal
>> actors (a regression).
>>
>> -Endre
>>
>> On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg  wrote:
>>
>>> Upgrading to 2.3.9 does indeed seem to solve my problem. At least I
>>> haven't experienced them yet.
>>>
>>> Now I'm curious what the fixes were, is there somewhere a change summary
>>> between versions or where is it listed what bugs have been fixed in which
>>> versions?
>>>
>>> On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote:

 Hi Johannes,

 We just released 2.3.9 with important bugfixes. I recommend to update
 and see if the problem is still persisting.

 -Endre

 On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg 
 wrote:

> Many connections seem to be formed in the case when the node has been
> marked down for unreachability even though it's still alive and it tries 
> to
> connect back into the cluster. The removed node prints:
>
> "Address is now gated for 5000 ms, all messages to this address will
> be delivered to dead letters. Reason: The remote system has quarantined
> this system. No further associations to the remote system are possible
> until this system is restarted."
>
> It doesn't seem to close the connections properly even though it opens
> new ones continously.
>
> Anyway that's a separate issue that I'm not that concerned about right
> now, I've now realized I don't want to use automatic downing instead I
> would like to allow nodes to go unreachable and come back to reachable 
> even
> if it takes quite some time and manually stopping the process and downing
> the node in case of an actual crash.
>
> Consequently I've put
>
> auto-down-unreachable-after = off
>
> in the config. Now I have the problem that nodes still are removed,
> this is from the leader node log:
>
> 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4]
> Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1] -
> Leader is removing unreachable node [akka.tcp://system@ip2:port2]
>
> I can understand my node is marked unreachable beause it's under heavy
> load but I don't understand what could cause it to be removed. I'm not
> doing any manual downing and have the auto-down to off, what else could
> trigger the removal?
>
> Using the akka-cluster script I can see that the node has most other
> nodes marked as unreachable (including the leader) and that it has another
> leader than other nodes.
>
> My test system consists of 8 nodes.
>
> About the unreachability I'm not having long GC pauses and not sending
> large blobs, but I'm sending very many smaller messages as fast as I can.
> If I just hammer it fast enough it will end up unreachable which I can
> except, but I need to get it back to reachable.
>
> On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson
> wrote:
>>
>> Hi Johannes,
>>
>> On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com)
>> wrote:
>>
>> Hi! I'm doing some load tests in our system and getting problems that
>> some of my nodes are marked as unreachable even though the processes are
>> up. I'm seeing it going a few times from reachable to unreachable and 
>> back
>> a few times before staying unreachable saying connection gated for 5000ms
>> and staying silently that way.
>>
>> Looking at the connections made to one of the seed nodes I see that I
>> have several hundreds of connections from other nodes except the failing
>> ones. Is

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Endre Varga
Without detailed logs I cannot say. If there would be a system message
buffer overflow then it would cry loudly in the logs. Also it says that an
unreachable node is being removed, so there should be events happening
before unreachability. This might be something completely else. The full
config would also help, but a reproducer would be the best.

-Endre

On Thu, Jan 22, 2015 at 2:56 PM, Viktor Klang 
wrote:

> Endre, could it be due to pending-to-send system message overflow?
>
> On Thu, Jan 22, 2015 at 11:45 AM, Johannes Berg 
> wrote:
>
>> Okay, I increased the load further and now I see the same problem again.
>> It seems to just have gotten a bit better in that it doesn't happen as
>> fast, but with enough load it happens.
>>
>> To re-iterate, I have Akka 2.3.9 on all (8) nodes and
>> auto-down-unreachable-after = off on all nodes and I don't do any manual
>> downing anywhere, still the leader log prints this:
>>
>> 2015-01-22 10:35:37 + - [INFO] - from Cluster(akka://system) in
>> system-akka.actor.default-dispatcher-2
>> Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing
>> unreachable node [akka.tcp://system@ip2:port2]
>>
>> and the node(s) under load is(are) removed from the cluster
>> (quarantined). How is this possible?
>>
>> On Wednesday, January 21, 2015 at 5:53:06 PM UTC+2, drewhk wrote:
>>>
>>> Hi Johannes,
>>>
>>> See the milestone here: https://github.com/akka/
>>> akka/issues?q=milestone%3A2.3.9+is%3Aclosed
>>>
>>> The tickets cross reference the PRs, too, so you can look at the code
>>> changes. The issue that probably hit you is https://github.com/akka/
>>> akka/issues/16623 which manifested as system message delivery errors on
>>> some systems, but actually was caused by accidentally duplicated internal
>>> actors (a regression).
>>>
>>> -Endre
>>>
>>> On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg 
>>> wrote:
>>>
 Upgrading to 2.3.9 does indeed seem to solve my problem. At least I
 haven't experienced them yet.

 Now I'm curious what the fixes were, is there somewhere a change
 summary between versions or where is it listed what bugs have been fixed in
 which versions?

 On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote:
>
> Hi Johannes,
>
> We just released 2.3.9 with important bugfixes. I recommend to update
> and see if the problem is still persisting.
>
> -Endre
>
> On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg 
> wrote:
>
>> Many connections seem to be formed in the case when the node has been
>> marked down for unreachability even though it's still alive and it tries 
>> to
>> connect back into the cluster. The removed node prints:
>>
>> "Address is now gated for 5000 ms, all messages to this address will
>> be delivered to dead letters. Reason: The remote system has quarantined
>> this system. No further associations to the remote system are possible
>> until this system is restarted."
>>
>> It doesn't seem to close the connections properly even though it
>> opens new ones continously.
>>
>> Anyway that's a separate issue that I'm not that concerned about
>> right now, I've now realized I don't want to use automatic downing 
>> instead
>> I would like to allow nodes to go unreachable and come back to reachable
>> even if it takes quite some time and manually stopping the process and
>> downing the node in case of an actual crash.
>>
>> Consequently I've put
>>
>> auto-down-unreachable-after = off
>>
>> in the config. Now I have the problem that nodes still are removed,
>> this is from the leader node log:
>>
>> 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4]
>> Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1]
>> - Leader is removing unreachable node [akka.tcp://system@ip2:port2]
>>
>> I can understand my node is marked unreachable beause it's under
>> heavy load but I don't understand what could cause it to be removed. I'm
>> not doing any manual downing and have the auto-down to off, what else 
>> could
>> trigger the removal?
>>
>> Using the akka-cluster script I can see that the node has most other
>> nodes marked as unreachable (including the leader) and that it has 
>> another
>> leader than other nodes.
>>
>> My test system consists of 8 nodes.
>>
>> About the unreachability I'm not having long GC pauses and not
>> sending large blobs, but I'm sending very many smaller messages as fast 
>> as
>> I can. If I just hammer it fast enough it will end up unreachable which I
>> can except, but I need to get it back to reachable.
>>
>> On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson
>> wrote:
>>>
>>> Hi Johannes,
>>>
>>> On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com)
>>> wro

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Patrik Nordwall
If it's quarantined it will be removed from cluster. Please include the log 
entry that says that it is quarantined, if any.

/Patrik

> 22 jan 2015 kl. 14:56 skrev Viktor Klang :
> 
> Endre, could it be due to pending-to-send system message overflow?
> 
>> On Thu, Jan 22, 2015 at 11:45 AM, Johannes Berg  wrote:
>> Okay, I increased the load further and now I see the same problem again. It 
>> seems to just have gotten a bit better in that it doesn't happen as fast, 
>> but with enough load it happens.
>> 
>> To re-iterate, I have Akka 2.3.9 on all (8) nodes and 
>> auto-down-unreachable-after = off on all nodes and I don't do any manual 
>> downing anywhere, still the leader log prints this:
>> 
>> 2015-01-22 10:35:37 + - [INFO] - from Cluster(akka://system) in 
>> system-akka.actor.default-dispatcher-2 
>> Cluster Node [akka.tcp://system@ip1:port1] - Leader is removing unreachable 
>> node [akka.tcp://system@ip2:port2]
>> 
>> and the node(s) under load is(are) removed from the cluster (quarantined). 
>> How is this possible?
>> 
>>> On Wednesday, January 21, 2015 at 5:53:06 PM UTC+2, drewhk wrote:
>>> Hi Johannes,
>>> 
>>> See the milestone here: 
>>> https://github.com/akka/akka/issues?q=milestone%3A2.3.9+is%3Aclosed
>>> 
>>> The tickets cross reference the PRs, too, so you can look at the code 
>>> changes. The issue that probably hit you is 
>>> https://github.com/akka/akka/issues/16623 which manifested as system 
>>> message delivery errors on some systems, but actually was caused by 
>>> accidentally duplicated internal actors (a regression).
>>> 
>>> -Endre
>>> 
 On Wed, Jan 21, 2015 at 4:47 PM, Johannes Berg  wrote:
 Upgrading to 2.3.9 does indeed seem to solve my problem. At least I 
 haven't experienced them yet.
 
 Now I'm curious what the fixes were, is there somewhere a change summary 
 between versions or where is it listed what bugs have been fixed in which 
 versions?
 
> On Wednesday, January 21, 2015 at 11:31:02 AM UTC+2, drewhk wrote:
> Hi Johannes,
> 
> We just released 2.3.9 with important bugfixes. I recommend to update and 
> see if the problem is still persisting.
> 
> -Endre
> 
>> On Wed, Jan 21, 2015 at 10:29 AM, Johannes Berg  
>> wrote:
>> Many connections seem to be formed in the case when the node has been 
>> marked down for unreachability even though it's still alive and it tries 
>> to connect back into the cluster. The removed node prints:
>> 
>> "Address is now gated for 5000 ms, all messages to this address will be 
>> delivered to dead letters. Reason: The remote system has quarantined 
>> this system. No further associations to the remote system are possible 
>> until this system is restarted."
>> 
>> It doesn't seem to close the connections properly even though it opens 
>> new ones continously.
>> 
>> Anyway that's a separate issue that I'm not that concerned about right 
>> now, I've now realized I don't want to use automatic downing instead I 
>> would like to allow nodes to go unreachable and come back to reachable 
>> even if it takes quite some time and manually stopping the process and 
>> downing the node in case of an actual crash.
>> 
>> Consequently I've put
>> 
>> auto-down-unreachable-after = off
>> 
>> in the config. Now I have the problem that nodes still are removed, this 
>> is from the leader node log:
>> 
>> 08:50:14.087UTC INFO [system-akka.actor.default-dispatcher-4] 
>> Cluster(akka://system) - Cluster Node [akka.tcp://system@ip1:port1] - 
>> Leader is removing unreachable node [akka.tcp://system@ip2:port2]
>> 
>> I can understand my node is marked unreachable beause it's under heavy 
>> load but I don't understand what could cause it to be removed. I'm not 
>> doing any manual downing and have the auto-down to off, what else could 
>> trigger the removal?
>> 
>> Using the akka-cluster script I can see that the node has most other 
>> nodes marked as unreachable (including the leader) and that it has 
>> another leader than other nodes.
>> 
>> My test system consists of 8 nodes.
>> 
>> About the unreachability I'm not having long GC pauses and not sending 
>> large blobs, but I'm sending very many smaller messages as fast as I 
>> can. If I just hammer it fast enough it will end up unreachable which I 
>> can except, but I need to get it back to reachable.
>> 
>>> On Thursday, December 11, 2014 at 11:22:41 AM UTC+2, Björn Antonsson 
>>> wrote:
>>> Hi Johannes,
>>> 
 On 9 December 2014 at 15:29:53, Johannes Berg (jber...@gmail.com) 
 wrote:
 
 Hi! I'm doing some load tests in our system and getting problems that 
 some of my nodes are marked as unreachable even though the processes 
 are up. I'm seeing it going a few times from reac

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Johannes Berg
Thanks for the tip for what to look for, my logs are huge so it's a bit of 
a jungle. Anyway I found this:

10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
irrecoverably failed. Quarantining address.
akka.remote.ResendBufferCapacityReachedException: Resend buffer capacity of 
[1000] has been reached.
at akka.remote.AckedSendBuffer.buffer(AckedDelivery.scala:121) 
~[akka-remote_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$tryBuffer(Endpoint.scala:388)
 
~[akka-remote_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$handleSend(Endpoint.scala:372)
 
~[akka-remote_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:279)
 
~[akka-remote_2.11-2.3.9.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
~[akka-actor_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188) 
~[akka-remote_2.11-2.3.9.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
~[akka-actor_2.11-2.3.9.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
[scala-library-2.11.5.jar:na]

Where ip2 and port2 is the same as in my previous post, and this happened 
on a node (ip3:port3) which also had high load.

After this the ip2:port2 node started to print:
10:34:24.234UTC WARN [system-akka.actor.default-dispatcher-2] Remoting - 
Tried to associate with unreachable remote address 
[akka.tcp://system@ip3:port3]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: The 
remote system has quarantined this system. No further associations to the 
remote system are possible until this system is restarted.

On ip3:port3 I also later see:
10:34:25.180UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
irrecoverably failed. Quarantining address.
java.lang.IllegalStateException: Error encountered while processing system 
message acknowledgement buffer: [3 {0, 1, 2, 3}] ack: ACK[2114, {}]
at 
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
 
~[akka-remote_2.11-2.3.9.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
~[akka-actor_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188) 
~[akka-remote_2.11-2.3.9.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
~[akka-actor_2.11-2.3.9.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
~[akka-actor_2.11-2.3.9.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
 
~[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.11.5.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
[scala-library-2.11.5.jar:na]
Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was 3 but 
cumulative ACK is 2114
at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103) 
~[akka-remote_2.11-2.3.9.jar:na]
at 
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:283)
 
~[akka-remote_2.11-2.3.9.jar:na]
... 12 common frames omitted


Maybe this explains something? What should I do about this?

On Thursday, January 22, 2015 at 4:05:52 PM UT

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Patrik Nordwall
You can try to increase akka.remote.system-message-buffer-size config
setting. Default is 1000.
/Patrik

On Thu, Jan 22, 2015 at 3:41 PM, Johannes Berg  wrote:

> Thanks for the tip for what to look for, my logs are huge so it's a bit of
> a jungle. Anyway I found this:
>
> 10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting -
> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952]
> irrecoverably failed. Quarantining address.
> akka.remote.ResendBufferCapacityReachedException: Resend buffer capacity
> of [1000] has been reached.
> at akka.remote.AckedSendBuffer.buffer(AckedDelivery.scala:121)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at
> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$tryBuffer(Endpoint.scala:388)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at
> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$handleSend(Endpoint.scala:372)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:279)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.11.5.jar:na]
>
> Where ip2 and port2 is the same as in my previous post, and this happened
> on a node (ip3:port3) which also had high load.
>
> After this the ip2:port2 node started to print:
> 10:34:24.234UTC WARN [system-akka.actor.default-dispatcher-2] Remoting -
> Tried to associate with unreachable remote address 
> [akka.tcp://system@ip3:port3].
> Address is now gated for 5000 ms, all messages to this address will be
> delivered to dead letters. Reason: The remote system has quarantined this
> system. No further associations to the remote system are possible until
> this system is restarted.
>
> On ip3:port3 I also later see:
> 10:34:25.180UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting -
> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952]
> irrecoverably failed. Quarantining address.
> java.lang.IllegalStateException: Error encountered while processing system
> message acknowledgement buffer: [3 {0, 1, 2, 3}] ack: ACK[2114, {}]
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188)
> ~[akka-remote_2.11-2.3.9.jar:na]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> ~[akka-actor_2.11-2.3.9.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> ~[scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.11.5.jar:na]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.11.5.jar:na]
> Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was 3
> but cumulative ACK is 2114
> at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103)
> ~[akka-r

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Johannes Berg

I will try that but it seems that will only help to a certain point and 
when I push the load further it will hit it again.

I hit this within a minute after I put on the load which is a bit annoying 
to me. I'm fine with it becoming unreachable as long as I can get it back 
to reachable when it has crunched through the load. Will it still buffer up 
system messages even though it's unreachable? At what rate are system 
messages typically sent?

As it is now it's easy to take down the system before you have any chance 
of spinning up new nodes.


On Thursday, January 22, 2015 at 5:32:33 PM UTC+2, Patrik Nordwall wrote:
>
> You can try to increase akka.remote.system-message-buffer-size config 
> setting. Default is 1000.
> /Patrik
>
> On Thu, Jan 22, 2015 at 3:41 PM, Johannes Berg  > wrote:
>
>> Thanks for the tip for what to look for, my logs are huge so it's a bit 
>> of a jungle. Anyway I found this:
>>
>> 10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
>> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
>> irrecoverably failed. Quarantining address.
>> akka.remote.ResendBufferCapacityReachedException: Resend buffer capacity 
>> of [1000] has been reached.
>> at akka.remote.AckedSendBuffer.buffer(AckedDelivery.scala:121) 
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at 
>> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$tryBuffer(Endpoint.scala:388)
>>  
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at 
>> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$handleSend(Endpoint.scala:372)
>>  
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:279)
>>  
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at 
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188) 
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>> ~[scala-library-2.11.5.jar:na]
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>  
>> ~[scala-library-2.11.5.jar:na]
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>  
>> ~[scala-library-2.11.5.jar:na]
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>> [scala-library-2.11.5.jar:na]
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>  
>> [scala-library-2.11.5.jar:na]
>>
>> Where ip2 and port2 is the same as in my previous post, and this happened 
>> on a node (ip3:port3) which also had high load.
>>
>> After this the ip2:port2 node started to print:
>> 10:34:24.234UTC WARN [system-akka.actor.default-dispatcher-2] Remoting - 
>> Tried to associate with unreachable remote address 
>> [akka.tcp://system@ip3:port3]. Address is now gated for 5000 ms, all 
>> messages to this address will be delivered to dead letters. Reason: The 
>> remote system has quarantined this system. No further associations to the 
>> remote system are possible until this system is restarted.
>>
>> On ip3:port3 I also later see:
>> 10:34:25.180UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
>> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
>> irrecoverably failed. Quarantining address.
>> java.lang.IllegalStateException: Error encountered while processing 
>> system message acknowledgement buffer: [3 {0, 1, 2, 3}] ack: ACK[2114, {}]
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
>>  
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at 
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188) 
>> ~[akka-remote_2.11-2.3.9.jar:na]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
>> ~[akka-actor_2.11-2.3.9.jar:n

[akka-user] Akka persistence (ES) - unique persistenceId design

2015-01-22 Thread Sebastian Bach
Hi!

How do you generate your persistenceIds?
One common approach is UUID.randomUUID().toString(). But in my opinion it 
could be worth it to choose a smarter generation strategy.
Some use cases based on key segments are:

   - Akka cluster sharding
   - NoSQL DB sharding
   - Lexicographical order of PK in (query model) RDBMS indexes (see Why 
   GUID primary keys are a database’s worst nightmare 
   

   )
   - Creation of topics in event logs (e.g. Kafka)
   - More convenient preselection in processing of event logs (e.g. by akka 
   streams, Spark)
   - Routing of messages in the actor hierarchy.

MongoDB has e.g. this ObjectId 
. (not very useful for 
the above examples)
And in the book IDDD by *Vaughn Vernon* I've found this example for an 
unique entity identity:

   - Bounded context name (abbreviation)
   - Entity name (abbreviation)
   - Creation date
   - First segment of a random UUID

A variation of this could also include a tenant id or a creation time.
On the other hand the persistenceId shouldn't be too long, because it will 
be in every single event in the log (and other places).

What do you think? What information is most valuable to be encoded in the 
persistenceId 
for future use?

Best regards
Sebastian

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Johannes Berg
Also I've made the failure detection a bit less sensitive since it very 
quickly went unreachable before:

akka.cluster.failure-detector {
  threshold = 16.0
  acceptable-heartbeat-pause = 6s
  min-std-deviation = 200 ms
  expected-response-after = 9 s
}

Does this have any impact on how much space I need to reserve in the system 
message resend buffer? (related to the question if system messages are 
buffered up while it's unreachable)

On Thursday, January 22, 2015 at 5:53:49 PM UTC+2, Johannes Berg wrote:
>
>
> I will try that but it seems that will only help to a certain point and 
> when I push the load further it will hit it again.
>
> I hit this within a minute after I put on the load which is a bit annoying 
> to me. I'm fine with it becoming unreachable as long as I can get it back 
> to reachable when it has crunched through the load. Will it still buffer up 
> system messages even though it's unreachable? At what rate are system 
> messages typically sent?
>
> As it is now it's easy to take down the system before you have any chance 
> of spinning up new nodes.
>
>
> On Thursday, January 22, 2015 at 5:32:33 PM UTC+2, Patrik Nordwall wrote:
>>
>> You can try to increase akka.remote.system-message-buffer-size config 
>> setting. Default is 1000.
>> /Patrik
>>
>> On Thu, Jan 22, 2015 at 3:41 PM, Johannes Berg  wrote:
>>
>>> Thanks for the tip for what to look for, my logs are huge so it's a bit 
>>> of a jungle. Anyway I found this:
>>>
>>> 10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
>>> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
>>> irrecoverably failed. Quarantining address.
>>> akka.remote.ResendBufferCapacityReachedException: Resend buffer capacity 
>>> of [1000] has been reached.
>>> at akka.remote.AckedSendBuffer.buffer(AckedDelivery.scala:121) 
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$tryBuffer(Endpoint.scala:388)
>>>  
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor.akka$remote$ReliableDeliverySupervisor$$handleSend(Endpoint.scala:372)
>>>  
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:279)
>>>  
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188) 
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>>> ~[scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>  
>>> ~[scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>  
>>> ~[scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>>> [scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>  
>>> [scala-library-2.11.5.jar:na]
>>>
>>> Where ip2 and port2 is the same as in my previous post, and this 
>>> happened on a node (ip3:port3) which also had high load.
>>>
>>> After this the ip2:port2 node started to print:
>>> 10:34:24.234UTC WARN [system-akka.actor.default-dispatcher-2] Remoting - 
>>> Tried to associate with unreachable remote address 
>>> [akka.tcp://system@ip3:port3]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: The 
>>> remote system has quarantined this system. No further associations to the 
>>> remote system are possible until this system is restarted.
>>>
>>> On ip3:port3 I also later see:
>>> 10:34:25.180UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting - 
>>> Association to [akka.tcp://system@ip2:port2] with UID [-1637388952] 
>>> irrecoverably failed. Quarantining address.
>>> java.lang.IllegalStateException: Error encountered while processing 
>>> system message acknowledgement buffer: [3 {0, 1, 2, 3}] ack: ACK[2114, {}]
>>> at 
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
>>>  
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
>>> ~[akka-acto

Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Endre Varga
Hi Johannes,

On Thu, Jan 22, 2015 at 4:53 PM, Johannes Berg  wrote:

>
> I will try that but it seems that will only help to a certain point and
> when I push the load further it will hit it again.
>

There is no system message traffic between two Akka systems by default, to
have a system send system messages to another you either need to use remote
deployment or deathwatch on remote actors. Which one are you using? What is
the scenario?

The main issue is that whatever is the rate of system message delivery we
*cannot* backpressure remote deployment or how many watched remote actors
die. For any delivery buffer there is a large enough "mass actor extinction
event" that will just fill it up. You can increase the buffer size though
up to that point where you expect that a burst maximum is present (for
example you know the peak number of remote watched actors and the peak rate
of them dying).


>
> I hit this within a minute after I put on the load which is a bit annoying
> to me. I'm fine with it becoming unreachable as long as I can get it back
> to reachable when it has crunched through the load.
>

That means a higher buffer size. If there is no sysmsg buffer size that can
absorb your load then you have to rethink your remote deployment/watch
strategy (whichever feature you use).


> Will it still buffer up system messages even though it's unreachable?
>

After quarantine there is no system message delivery, everything is
dropped. There is no recovery from quarantine that is its purpose. If there
is any lost system message between two systems (and here they are dropped
due to the buffer being full) then they are in an undefined state,
especially with remote deployment, so they quarantine each other.


> At what rate are system messages typically sent?
>

They are sent at the rate you are remote deploying or watching actors
remotely or at the rate remote watched actors die. On the wire it depends,
and user messages share the same TCP connection with the system messages
which might also reduce available throughput.

You can tune the dispather of remoting by adding more threads to it, you
might also increase the netty threadpool:
http://doc.akka.io/docs/akka/2.3.9/general/configuration.html#akka-remote

You might want to set the system-message-ack-piggyback-timeout setting to a
lower value, like 100ms.

-Endre


> As it is now it's easy to take down the system before you have any chance
> of spinning up new nodes.
>
>
> On Thursday, January 22, 2015 at 5:32:33 PM UTC+2, Patrik Nordwall wrote:
>>
>> You can try to increase akka.remote.system-message-buffer-size config
>> setting. Default is 1000.
>> /Patrik
>>
>> On Thu, Jan 22, 2015 at 3:41 PM, Johannes Berg  wrote:
>>
>>> Thanks for the tip for what to look for, my logs are huge so it's a bit
>>> of a jungle. Anyway I found this:
>>>
>>> 10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting
>>> - Association to [akka.tcp://system@ip2:port2] with UID [-1637388952]
>>> irrecoverably failed. Quarantining address.
>>> akka.remote.ResendBufferCapacityReachedException: Resend buffer
>>> capacity of [1000] has been reached.
>>> at akka.remote.AckedSendBuffer.buffer(AckedDelivery.scala:121)
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.remote.ReliableDeliverySupervisor.akka$remote$
>>> ReliableDeliverySupervisor$$tryBuffer(Endpoint.scala:388)
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.remote.ReliableDeliverySupervisor.akka$remote$
>>> ReliableDeliverySupervisor$$handleSend(Endpoint.scala:372)
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.remote.ReliableDeliverySupervisor$$
>>> anonfun$receive$1.applyOrElse(Endpoint.scala:279)
>>> ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.remote.ReliableDeliverySupervisor.
>>> aroundReceive(Endpoint.scala:188) ~[akka-remote_2.11-2.3.9.jar:na]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> ~[akka-actor_2.11-2.3.9.jar:na]
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> ~[scala-library-2.11.5.jar:na]
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> pollAndExecAll(ForkJoinPool.java:1253) ~[scala-library-2.11.5.jar:na]
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1346) ~[scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [scala-library-2.11.5.jar:na]
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(Fo

[akka-user] Creating custom source materializations

2015-01-22 Thread Jeremy Stone
Is there any way at the moment to create custom sources with specific 
custom materializations?

Essentially we want to be able to create custom flexible scheduling Sources 
(similar to TickSource) that materialize to Cancellable so that we can stop 
them.

I can see how to create an ActorPublisher and then a Source from this using 
a PropsSource, but this materializes to the actor ref.

My initial thoughts were therefore to create another implementation of 
KeyedActorFlowSource. The problem is that ActorBasedFlowMaterializer 
methods (especially actorOf) are inaccessible as this is an internal API 
class.

Thanks in advance.

Jeremy

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka-Http: How to connect a Route with an IncomingConnection?

2015-01-22 Thread Philippe Milot
So I'm following the documentation on the new Akka-Http experimental 
modules, and I'm trying to get a basic Hello World using the Routing DSL. 

I've defined my route as follows:

  val route: Route = {
complete(HttpResponse(entity = "Hello, world!"))
  }

I can bind and accept HTTP connections:

serverBinding.connections.foreach { connection =>
  println("Accepted new connection from " + connection.remoteAddress)
  // How do I "run" my route from here??
}

I feel pretty dumb because I can't find anywhere in the documentation 
explaining how to plug the two together. I tried scraping the API docs to 
find some class or method that does it, but all I found was a call to 
Route.handlerFlow(route) which requires some RoutingSetup implicit variable 
that is very difficult to instantiate. I couldn't find any Typesafe 
Activator template that shows how to do this either.

I feel pretty dumb because I'm probably missing something really obvious. 
Please help!

 - Phili

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka-Http: How to connect a Route with an IncomingConnection?

2015-01-22 Thread Endre Varga
Hi, Philippe,

On Thu, Jan 22, 2015 at 6:28 PM, Philippe Milot  wrote:

> So I'm following the documentation on the new Akka-Http experimental
> modules, and I'm trying to get a basic Hello World using the Routing DSL.
>
> I've defined my route as follows:
>
>   val route: Route = {
> complete(HttpResponse(entity = "Hello, world!"))
>   }
>
> I can bind and accept HTTP connections:
>
> serverBinding.connections.foreach { connection =>
>   println("Accepted new connection from " + connection.remoteAddress)
>   // How do I "run" my route from here??
> }
>
> I feel pretty dumb because I can't find anywhere in the documentation
> explaining how to plug the two together.
>

We are filling the holes in the docs slowly (but surely) so things are
still missing. You can look though at this little test server to get a feel
how to plug things together:
https://github.com/akka/akka/blob/release-2.3-dev/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala

-Endre


> I tried scraping the API docs to find some class or method that does it,
> but all I found was a call to Route.handlerFlow(route) which requires
> some RoutingSetup implicit variable that is very difficult to instantiate.
> I couldn't find any Typesafe Activator template that shows how to do this
> either.
>
> I feel pretty dumb because I'm probably missing something really obvious.
> Please help!
>
>  - Phili
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Dispatcher assignment issue for Http Server

2015-01-22 Thread Randy Fox
 

Hi Roland,

Thanks for the response.  The 100 threads was just to verify that I could 
control the # threads.  I didn’t realize it was using a bounded task queue.

How do I configure a dedicated dispatcher for the operations in the flows 
that handle each connection as opposed to inheriting it from the 
FlowMaterializer?  That is what I was really trying to do.  Currently they 
seem to share the dispatcher.  It is not because I am blocking, but because 
I want to know how to control config on everything within these higher 
level constructs and not have to use the default.

Cheers,

Randy

On Thursday, January 22, 2015 at 4:08:23 AM UTC-8, rkuhn wrote:
>
> Hi Randy,
>
> without setting max-pool-size-max (which defaults to 64) you will not be 
> able to see more than 64 threads. May I ask why you want to create 100 
> threads for handling a single stream of incoming connections? I would 
> expect at least 99 of them to be idle at all times. If you want to perform 
> blocking operations in the flows that handle each of those connections then 
> I’d recommend to use a dedicated dispatcher for those.
>
> Regards,
>
> Roland
>
> 20 jan 2015 kl. 17:21 skrev Randy Fox 
> >:
>
> Nope.  Did I find a bug or am i doing something wrong?
>
> On Saturday, January 17, 2015 at 1:21:12 PM UTC-8, √ wrote:
>>
>> Alright. Did you manage to sort it out?
>>
>> -- 
>> Cheers,
>> √
>> On 16 Jan 2015 16:56, "Randy Fox"  wrote:
>>
>>> I expected the thread pool to be pegged at 100 threads, but watching in 
>>> visual vm showed 24 threads (default).  Poking around the object structure 
>>> in the debugger, I found nested deep under the flowmaterializer was a 
>>> maxsize of 64 (also default).
>>>
>>> -r
>>>
>>> On Thursday, January 15, 2015 at 5:34:05 PM UTC-8, Randy Fox wrote:

 I am trying to assign a dispatcher to be used for my http server (using 
 the new Akka HTTP Server) and can’t get it to honor my pool size settings. 
  
 Logs show it is using the dispatcher, but visualvm shows it is not using 
 the core pool size settings.   Loos like it might be using the defaults 
 for 
 a thread-pool-executor.


 All I did was modify the example:

1. import akka.http.Http 
2. import akka.stream.FlowMaterializer 
3.   
4. implicit val system = ActorSystem() 
5. implicit val materializer = FlowMaterializer(
*MaterializerSettings*(*system*).withDispatcher(
*“myhttpRequestHandler.dispatcher"*)) 
6.   
7. val serverBinding = Http(system).bind(interface = "localhost", 
port = 8080) 
8. serverBinding.connections.foreach { connection => // foreach 
materializes the source 
9.   println("Accepted new connection from " + 
connection.remoteAddress) 
10. } 
11. … 

 myhttpRequestHandler {

   dispatcher {

 type = Dispatcher

 executor = *"thread-pool-executor"*

 name = httprequesthandler-dispatcher

 thread-pool-executor {

   core-pool-size-min = 100

   core-pool-size-factor = 2.0

   core-pool-size-max = 100

 }

 throughput = 5

   }

 }

 [INFO] [2015-01-15 17:24:27,516] [DUDE-myhttpRequestHandler.dispatcher-79] 
 HttpRequestHandler(akka://DUDE): Accepted new connection from /
 127.0.0.1:54046

 What am I missing?

 Thanks,

 Randy Fox

>>>
>>> -- 
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe  – Reactive apps on the JVM.
> twitter: @rolandkuhn
> 

Re: [akka-user] akka persistence and need for intercept of persisted messages

2015-01-22 Thread Roland Kuhn
Hi Ketil,

This is a decision that you can make according to your operational 
requirements. I expect that it will typically be a mixture of the two since 
switching to a new application version is quicker of no journal rewrite is 
needed while long term you want to prune the old formats where possible to cap 
the code ugliness and duplication.

Regards,

Roland 

Sent from my iPhone

> On 21 Jan 2015, at 13:18, Ketil Johannessen  
> wrote:
> 
> Hi,
> Just to clarify: are you proposing an actor/application with the sole purpose 
> of converting messages from old version to new version, or that the existing 
> application with its actors should support multiple versions of the messages?
> 
>> On Tuesday, January 20, 2015 at 1:08:38 PM UTC+1, rkuhn wrote:
>> Hi Ketil,
>> 
>> during development you will commonly find and fix mistakes or add feature 
>> which will require you to scrap all the (wrong) history, but that is fine 
>> because nothing is really durable at that time. When you go to production 
>> the use of persistence implies that you need to keep those data around—if 
>> not then why would you persist them? So the only solution is to remain able 
>> to read old data, which is most reliably implemented by never removing or 
>> changing an old event type and only add new ones. While you can of course 
>> convert all stored events on disk into a new format after a change, this 
>> should not be a requirement for your program to run—it is more a maintenance 
>> optimization.
>> 
>> The best way to convert stored events is to replay them by an Actor that 
>> will persist them in the new format, currently that would be a 
>> PersistentView.
>> 
>> Regards,
>> 
>> Roland
>> 
>>> 16 jan 2015 kl. 14:50 skrev Ketil Johannessen :
>>> 
>>> Hi!
>>> 
>>> In my current project we are using akka-persistence with LevelDB. Our 
>>> application is completely event sourced using akka-persistence, i.e there 
>>> are no other mechanism for persisting business state in use. 
>>> 
>>> However, we have identified the need for being able to convert persisted 
>>> messages, when deploying a new version of the application with changes 
>>> effecting the persisted data structure (.ie new attributes, deleted 
>>> attributes, changed datatypes). As of now, we must completely discard our 
>>> business data every time we deploy a new version, in order to avoid 
>>> exceptions when replay of persisted messages upon startup.
>>> 
>>> I would believe that many other projects face the same problem, and believe 
>>> that this could be solved with a general mechanism in akka persistence.
>>> 
>>> I have done a poc on "daisy-chaining" persistence plugins, by creating my 
>>> own plugin ("proxy-plugin") and have this plugin forward all messages to 
>>> the actual persistence plugin (leveldb or cassandra). In addition we are 
>>> using our own serializer, serializing all messages to json strings prior to 
>>> persisting (to avoid class version issues).
>>> 
>>> In my own proxy-plugin I can now register mapper classes (these 
>>> implementing a common apply() trait, intercept messages and modify the 
>>> persisted json-based messages from previous versions to the current 
>>> deployed. This way we can keep our persisted business messages across 
>>> versions of the message classes, without the need for discarding old data. 
>>> The poc now is working ok for journals, but not yet for snapshots.
>>> 
>>> There are also other cross-cutting concerns related to persistence where 
>>> such a feature could be useful, for instance auditing
>>> 
>>> Is this something that could be implemented in akka-persistence?
>>> 
>>> 
>>> 
>>> -- 
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: 
>>> >> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives: https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google Groups 
>>> "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an 
>>> email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>> 
>> 
>> 
>> Dr. Roland Kuhn
>> Akka Tech Lead
>> Typesafe – Reactive apps on the JVM.
>> twitter: @rolandkuhn
> 
> -- 
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group 

[akka-user] [akka-stream] Periodic source of sources

2015-01-22 Thread Boris Lopukhov
Hi all!

How can i do a Source[Source[String]] that periodically emit the 
Source[String] with the specified interval, but only if the previous 
Source[String] is complete?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Persistence - Views with multiple processors

2015-01-22 Thread Yann Simon
Hi Patrick,

Le dimanche 20 avril 2014 16:59:22 UTC+2, Patrik Nordwall a écrit :
>
>
>
>
> On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier  > wrote:
>
>> Hi Patrick, 
>>
>> Sounds like an interesting approach, storing some meta-data at the view 
>> may help to check / show the reliability of the system. 
>>
>> At this moment the events are sent to a processor per node that publishes 
>> the event (distributed pub sub) 
>>
>
> That sounds good, as well.
>  
>
>> When you talk about view, that's the akka-persistence view ? 
>>
>
> Yes, persistence.View and persistence.Processor
>  
>
>> So more or less, the sub processors could send messages to the View and 
>> when there is a Persist() around it, it will be stored. 
>>
>
> I'm not sure I understand what you mean here. Let me clarify my proposal 
> with an example. Let's say we have a User aggregate root with some profile 
> information that can be updated. The user is represented by a User 
> EventsourcedProcessor actor, which is sharded. On the query side we want to 
> be able to search users by first and last name, i.e. we want to store all 
> users in a relational database table on the query side.
>
> The User actor persist FirstNameChanged, and inside the persist block it 
> sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On 
> the query side we have a AllUsersView connected to that processor. When 
> AllUsersView receives FirstNameChanged it updates the db table.
>
> To handle lost messages between User and AllUsers you might want to send 
> an acknowledgement from AllUsers to User, and have a retry mechanism in 
> User. I would implement that myself in User, but PersistentChannel might be 
> an alternative.
>

Let's say the current version in production only have User actors.
Now we want to deliver an new version that include the new Query with the 
AllUsers Processor.
How can we be sure that AllUsers receive all the events to be able to 
construct its state?
 
Thanks in advance,
Yann

>
>
>  
>
>>
>> Is that a correct understanding ?
>>
>> Kind regards, 
>>
>> Olger
>>
>>
>> On Sunday, April 20, 2014 2:32:07 PM UTC+2, Patrik Nordwall wrote:
>>
>>> Hi Olger,
>>>
>>> What if you keep the sharded event sourced actors (+10k), but let them 
>>> also send the events to one or a few processors. Then you can connect the 
>>> views/streams to these processors.
>>>
>>> If you don't like storing the events twice you can instead store some 
>>> meta-data (processor-id, seq-no,timestamp) and have a view that creates 
>>> sub-views on demand from the replayed meta-data. The sub-views would 
>>> forward to the parent aggregated view.
>>>
>>> /Patrik
>>>
>>> 19 apr 2014 kl. 20:46 skrev Olger Warnier :
>>>
>>>
>>> Hi Martin, 
>>>
>>> Had to think about it a little, hereby my follow up. (hope you don't 
>>> mind the continues discussion, it helps me a lot in defining the right 
>>> approach, thanks for that)
>>>
>>> On Saturday, April 19, 2014 7:11:23 AM UTC+2, Martin Krasser wrote:

  Hi Olger,

 installing 10k views/producers won't scale, at least not with the 
 current implementation. Here are some alternatives:

>>> Intresting, what would need to change to have is scaling ?
>>> (Idea is to have the eventsourcedprocessors reflect a DDD style 
>>> Aggregate Root instance and have those distributed using cluster sharding) 
>>>  
>>>

 - Maybe a custom journal plugin is what you need: a plugin that 
 delegates all write/read requests to the actual journal actor and that 
 additionally updates a database with the events to be written. This 
 essentially installs a single "listener" per ActorSystem (this is to some 
 extend comparable to a database trigger that executes additonal commands. 
 If the backend datastore supports that directly, I recommend implementing 
 the trigger there, if possible). 

>>>
>>> I am not sure, if I understand it.. the basic idea is to have the 
>>> 'events' stored via the eventsourcedprocessor being published to 'n' views. 
>>> The actual number of view that need to listen to these events are not known 
>>> up front (people can add their own views... at system startup, it will be 
>>> clear) 
>>> As every eventsourced actor is actually an AggregateRoot (in DDD terms) 
>>> and thereby something of an instance with it's own state, the changes in 
>>> these states need to be aggregated (that can be done with the streaming as 
>>> you mention) and published to the views that are interested (subscribed). 
>>> Doing this by hand in the aggregate root actor is not a problem, 
>>> thereafter write your own listener actor and that will populate a view data 
>>> store. Still I have the feeling that the actual 'View' (or ViewProducer) 
>>> could be implemented in such a way that it's done by the view.
>>>  
>>>

 - Instead of having thousands of processors, what speaks against 
 combining them into a single processor (or only a few) per node?

>>> This w

[akka-user] Inability to route through a proxy.

2015-01-22 Thread Tim St. Clair
Greetings folks - 

I'm currently trying to run Spark master through a proxy and receiving an 
error that I can't seem to bypass. 

ERROR EndpointWriter: dropping message [class 
akka.actor.ActorSelectionMessage] for non-local recipient 
[Actor[akka.tcp://sparkMaster@10.254.118.158:7077/]] arriving at 
[akka.tcp://sparkMaster@10.254.118.158:7077] inbound addresses are 
[akka.tcp://sparkMaster@spark-master:7077]

The spark-master is running inside a container which is on a 192.168 
subnet, but all traffic from the slaves are routed via iptables through a 
load-balanced proxy 10.254.118.158.  

Is there any easy was to disable what appears to be IP validation?  

Cheers,
Tim

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Handling boundary data in chunked requests

2015-01-22 Thread Yogesh
Not sure if I can use akka-http, but will definitely take a look. Thank you.

I basically need to strip those few "header" and "boundary" lines from my
content. Is there some way I can do it with spray?

On Thu, Jan 22, 2015 at 4:10 AM, Roland Kuhn  wrote:

> If I understand correctly then you’ll want to use akka-http which has
> support for handling multipart/formdata. This has the downside that
> akka-http is still being developed, but you can try out milestone 1.0-M2 to
> see if that works for you.
>
> Regards,
>
> Roland
>
> 21 jan 2015 kl. 01:52 skrev Yogesh Pandit :
>
> ​
> Hello,
>
> I am using spray-can 1.2.1 and have a service to upload data in chunks.
>
> It seems to be working fine but for the following boundary data being
> written to the uploaded file. How can I not write this into the file.
> Reading the whole file is not an option due to large size.
> I have used the following example -
> https://github.com/spray/spray/tree/master/examples/spray-can/simple-http-server/src/main/scala/spray/examples
>
>
> --WebKitFormBoundary5mkhOpb3GjmIWL7a
> Content-Disposition: form-data; name="datafile"; filename="file.csv"
> Content-Type: text/csv
>
> 
>
> --WebKitFormBoundary5mkhOpb3GjmIWL7a--
>
> Thanks you,
> -Yogesh
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe  – Reactive apps on the JVM.
> twitter: @rolandkuhn
> 
>
>  --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Handling boundary data in chunked requests

2015-01-22 Thread Roland Kuhn
AFAICS that would have nothing to do with spray itself: you get some 
ByteStrings and you want to remove some parts of them. What you need is an 
implementation of RFC 2388 , and 
akka-http includes one out of the box while spray does not.

Regards,

Roland

> 22 jan 2015 kl. 22:15 skrev Yogesh :
> 
> Not sure if I can use akka-http, but will definitely take a look. Thank you.
> 
> I basically need to strip those few "header" and "boundary" lines from my 
> content. Is there some way I can do it with spray?
> 
> On Thu, Jan 22, 2015 at 4:10 AM, Roland Kuhn  > wrote:
> If I understand correctly then you’ll want to use akka-http which has support 
> for handling multipart/formdata. This has the downside that akka-http is 
> still being developed, but you can try out milestone 1.0-M2 to see if that 
> works for you.
> 
> Regards,
> 
> Roland
> 
>> 21 jan 2015 kl. 01:52 skrev Yogesh Pandit > >:
>> 
>> ​
>> Hello,
>> 
>> I am using spray-can 1.2.1 and have a service to upload data in chunks.
>> 
>> It seems to be working fine but for the following boundary data being 
>> written to the uploaded file. How can I not write this into the file. 
>> Reading the whole file is not an option due to large size.
>> I have used the following example - 
>> https://github.com/spray/spray/tree/master/examples/spray-can/simple-http-server/src/main/scala/spray/examples
>>  
>> 
>> 
>> 
>> --WebKitFormBoundary5mkhOpb3GjmIWL7a
>> Content-Disposition: form-data; name="datafile"; filename="file.csv"
>> Content-Type: text/csv
>> 
>> 
>> 
>> --WebKitFormBoundary5mkhOpb3GjmIWL7a--
>> 
>> Thanks you,
>> -Yogesh
>> 
>> -- 
>> >> Read the docs: http://akka.io/docs/ 
>> >> Check the FAQ: 
>> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
>> >> 
>> >> Search the archives: https://groups.google.com/group/akka-user 
>> >> 
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com 
>> .
>> To post to this group, send email to akka-user@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user 
>> .
>> For more options, visit https://groups.google.com/d/optout 
>> .
> 
> 
> 
> Dr. Roland Kuhn
> Akka Tech Lead
> Typesafe  – Reactive apps on the JVM.
> twitter: @rolandkuhn
>  
> 
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> .
> To post to this group, send email to akka-user@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .
> 
> 
> -- 
> >> Read the docs: http://akka.io/docs/ 
> >> Check the FAQ: 
> >> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >> 
> >> Search the archives: https://groups.google.com/group/akka-user 
> >> 
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> .
> To post to this group, send email to akka-user@googlegroups.com 
> .
> Visit this group at http://groups.google.com/group/akka-user 
> .
> For more options, visit https://groups.google.com/d/optout 
> .

Re: [akka-user] Akka-Http: How to connect a Route with an IncomingConnection?

2015-01-22 Thread Philippe Milot
Thank you! Works like a charm. I love the module so far, looks very 
promising!

On Thursday, January 22, 2015 at 12:31:59 PM UTC-5, drewhk wrote:
>
> Hi, Philippe,
>
> On Thu, Jan 22, 2015 at 6:28 PM, Philippe Milot  > wrote:
>
>> So I'm following the documentation on the new Akka-Http experimental 
>> modules, and I'm trying to get a basic Hello World using the Routing DSL. 
>>
>> I've defined my route as follows:
>>
>>   val route: Route = {
>> complete(HttpResponse(entity = "Hello, world!"))
>>   }
>>
>> I can bind and accept HTTP connections:
>>
>> serverBinding.connections.foreach { connection =>
>>   println("Accepted new connection from " + connection.remoteAddress)
>>   // How do I "run" my route from here??
>> }
>>
>> I feel pretty dumb because I can't find anywhere in the documentation 
>> explaining how to plug the two together. 
>>
>
> We are filling the holes in the docs slowly (but surely) so things are 
> still missing. You can look though at this little test server to get a feel 
> how to plug things together: 
> https://github.com/akka/akka/blob/release-2.3-dev/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala
>
> -Endre
>  
>
>> I tried scraping the API docs to find some class or method that does it, 
>> but all I found was a call to Route.handlerFlow(route) which requires 
>> some RoutingSetup implicit variable that is very difficult to instantiate. 
>> I couldn't find any Typesafe Activator template that shows how to do this 
>> either.
>>
>> I feel pretty dumb because I'm probably missing something really obvious. 
>> Please help!
>>
>>  - Phili
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: sent/received messages excluding the deathwatch heartbeats?

2015-01-22 Thread Sam Halliday
Thanks Martynas,

It's a shame that this would be needed. It feels very much like the 
heartbeats should be considered part of the system layer when it comes to 
logging, and therefore already filtered. Seeing heartbeats when debugging 
makes it incredibly difficult to see anything.

My concern with a custom logger like this is that it will quickly get out 
of sync with the stock one and I might be missing out on various 
performance/bug fixes as akka is released. That said, the ability to see 
genuine messages in the logs when debugging is invaluable, so it might be 
worth it.

On Wednesday, 21 January 2015 16:29:07 UTC, Martynas Mickevičius wrote:
>
> Hi Sam,
>
> you can use custom event listener to display the log messages that you are 
> interested in.
>
> Here 
> 
>  you 
> can find a simple example demonstrating that. And there is a documentation 
>  section 
> explaining this. 
>
> On Mon, Jan 19, 2015 at 11:02 AM, Sam Halliday  > wrote:
>
>> Is this something that would be suitable for an RFE?
>>
>> More generally it would be good to have an option to only log userland 
>> messages in remoting.
>>
>>
>> On Friday, 16 January 2015 17:41:55 UTC, Sam Halliday wrote:
>>>
>>> Hi all,
>>>
>>> I'm trying to debug a remoting issue and I need to look at the logs of 
>>> the messages that are sent.
>>>
>>> However, the logs are completely filled up with the send/receive of 
>>> deathwatch messages.
>>>
>>> Without turning off deathwatch, is there any way to exclude it from the 
>>> logs?
>>>
>>> Latest akka stable, logging config looks like
>>>
>>> akka {
>>>   remote {
>>> log-sent-messages = on
>>> log-received-messages = on
>>> log-frame-size-exceeding = 1000b
>>>
>>> log-remote-lifecycle-events = off
>>>
>>> watch-failure-detector {
>>>   # Our timeouts can be so high, the PHI model basically doesn't
>>>   # know what to do, so just use a single timeout. 
>>>   # http://doc.akka.io/docs/akka/snapshot/scala/remoting.html
>>>   acceptable-heartbeat-pause = 60 s
>>> }
>>>   }
>>>
>>>   actor {
>>> debug {
>>>   receive = on
>>>   #autoreceive = on
>>>   #lifecycle = on
>>>   #fsm = on
>>>   #event-stream = on
>>>   unhandled = on
>>> }
>>> }
>>>
>>>  -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Martynas Mickevičius
> Typesafe  – Reactive 
>  Apps on the JVM
>  

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Inability to route through a proxy.

2015-01-22 Thread jay vyas
To add some color.

1) When we run w/ -Dakka.remote.untrusted-mode=on, we see dropping message 
[class akka.actor.ActorSelectionMessage] for *unknown* recipient

[Actor[akka.tcp://sparkMaster@10.254.230.67:7077/]] arriving at 
[akka.tcp://sparkMaster@10.254.230.67:7077] inbound addresses are 
[akka.tcp://sparkMaster@spark-master:7077]

2) When we run w/ -Dakka.remote.untrusted-mode=off, we see dropping message 
[class akka.actor.ActorSelectionMessage] for *non-local *recipient 

[Actor[akka.tcp://sparkMaster@10.254.118.158:7077/]] arriving at 
[akka.tcp://sparkMaster@10.254.118.158:7077] inbound addresses are 
[akka.tcp://sparkMaster@spark-master:7077]

On Thursday, January 22, 2015 at 4:00:42 PM UTC-5, Tim St. Clair wrote:
>
> Greetings folks - 
>
> I'm currently trying to run Spark master through a proxy and receiving an 
> error that I can't seem to bypass. 
>
> ERROR EndpointWriter: dropping message [class 
> akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://
> sparkMaster@10.254.118.158:7077/]] arriving at [akka.tcp://
> sparkMaster@10.254.118.158:7077] inbound addresses are 
> [akka.tcp://sparkMaster@spark-master:7077]
>
> The spark-master is running inside a container which is on a 192.168 
> subnet, but all traffic from the slaves are routed via iptables through a 
> load-balanced proxy 10.254.118.158.  
>
> Is there any easy was to disable what appears to be IP validation?  
>
> Cheers,
> Tim
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to serialize an Any?

2015-01-22 Thread Kevin Esler
I am puzzled by something:  Akka messages can be of type Any, yet 
akka.seriailziation.Serialization.serialize() required an AnyRef as the 
type of the object to be serialized.

So how does Akka serialize a message that is not an AnyRef?

(Reason for asking: I plan to use Akka serialization and would like to 
apply it to values that may not be AnyRef.(

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Feedback on Akka Streams 1.0-M2 Documentation

2015-01-22 Thread Sam Halliday
On Wednesday, 21 January 2015 08:21:15 UTC, drewhk wrote:
>> I believe it may be possible to use the current 1.0-M2 to address
>> my bugbear by using the Actor integration to write an actor that
>> has N instances behind a router, but it feels hacky to have to
>> use an Actor at all. What is really missing is a Junction that
>> multiplies the next Flow into N parallel parts that run on
>> separate threads.
>
> 
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

I actually missed this when reading the docs... it's a gem buried
in a sea of examples! :-) Is there anything like this in the "top
down" style documentation?

A convenience method to set up this sort of thing is exactly what
I mean. I should imagine that fanning out a Flow for
embarrasingly parallel processing is a common enough pattern that
one would want to be able to do this in a one liner. You note
something about work in this area later on (quoted out of order):

> In the future we will allow users to add explicit markers where
> the materializer needs to cut up chains/graphs into concurrent
> entities.

This sounds very good. Is there a ticket I can subscribe to for
updates? Is there a working name for the materializer so that I
know what to look out for?


> Also, you can try mapAsync or mapAsyncUnordered for similar
> tasks.

It would be good to read some discussion about these that goes
further than the API docs. Do they just create a Future and
therefore have no way to tell a fast producer to slow down? How
does one achieve pushback from these? Pushback on evaluation of
the result is essential, not on the ability to create/schedule
the futures. I would very like to see some documentation that
explains where this has an advantage over plain 'ol Scala Stream
with a map{Future{...}}.


>> In general, I felt that the documentation was missing any
>> commentary on concurrency and parallelisation. I was left
>> wondering what thread everything was happening on.
>
> ... as the default all stream processing element is backed by
>  an actor ...

The very fact that each component is backed by an Actor is news
to me. This wasn't at all obvious from the docs and actually
the "integration with actors" section made me think that streams
must be implemented completely differently if it needs an
integration layer! Actually, the "actor integration" really
means "low level streaming actors", doesn't it? I would strongly
recommend making this a lot clearer as it helps to de-mystify the
implementation.

Now knowing that each vertex in the graph is backed by an actor,
I wonder if in "balancing work to fixed pools" the Balance is
just adding a router actor with a balance strategy? The
convenience method I suggested above could simply create a router
to multiple instances of the same actor with a parallelism bound.
I'm not entirely sure why one would need a Merge strategy for
that, although the option to preserve input order at the output
would be good for some circumstances (which could apply pushback
in preference to growing the queue of out-of-order results).

In addition, this revelation helps me to understand the
performance considerations of using akka-streams. Knowing this,
it would only be appropriate for something I would've considered
suitable (from a performance point of view) for hand-crafted flow
control in akka before streams was released. The main advantage
of akka-streams is therefore that it has dramatically lowered the
barrier of entry for writing Actor code with flow control.



Thanks for this explanation Endre, I hope to see even more
innovation in the coming releases and improved documentation.

Best regards,
Sam 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: sent/received messages excluding the deathwatch heartbeats?

2015-01-22 Thread Sam Halliday
Thanks Patrik,

That's always an option with logback, but I'd rather not have to go there.

Grep in this case is not an option as the logs are coming through in a
Jenkins console and very noisy. I'd like to be able to just read it
off. The custom event handler sounds like it might be a good option
but I'd also like you to consider making logging heartbeat messages
optional as the value in seeing them is really rather low compared to
being able to see our domain messages.

Best regards,
Sam

On 21 January 2015 at 21:37, Patrik Nordwall  wrote:
> You could also use slf4j logger and define the filter in the logback config
> (or whatever impl you prefer).
>
> An alternative; use grep/egrep to filter out the relevant information from
> the log file after the fact.
>
> /Patrik
>
> 21 jan 2015 kl. 17:29 skrev Martynas Mickevičius
> :
>
> Hi Sam,
>
> you can use custom event listener to display the log messages that you are
> interested in.
>
> Here you can find a simple example demonstrating that. And there is a
> documentation section explaining this.
>
> On Mon, Jan 19, 2015 at 11:02 AM, Sam Halliday 
> wrote:
>>
>> Is this something that would be suitable for an RFE?
>>
>> More generally it would be good to have an option to only log userland
>> messages in remoting.
>>
>>
>> On Friday, 16 January 2015 17:41:55 UTC, Sam Halliday wrote:
>>>
>>> Hi all,
>>>
>>> I'm trying to debug a remoting issue and I need to look at the logs of
>>> the messages that are sent.
>>>
>>> However, the logs are completely filled up with the send/receive of
>>> deathwatch messages.
>>>
>>> Without turning off deathwatch, is there any way to exclude it from the
>>> logs?
>>>
>>> Latest akka stable, logging config looks like
>>>
>>> akka {
>>>   remote {
>>> log-sent-messages = on
>>> log-received-messages = on
>>> log-frame-size-exceeding = 1000b
>>>
>>> log-remote-lifecycle-events = off
>>>
>>> watch-failure-detector {
>>>   # Our timeouts can be so high, the PHI model basically doesn't
>>>   # know what to do, so just use a single timeout.
>>>   # http://doc.akka.io/docs/akka/snapshot/scala/remoting.html
>>>   acceptable-heartbeat-pause = 60 s
>>> }
>>>   }
>>>
>>>   actor {
>>> debug {
>>>   receive = on
>>>   #autoreceive = on
>>>   #lifecycle = on
>>>   #fsm = on
>>>   #event-stream = on
>>>   unhandled = on
>>> }
>>> }
>>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> >> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> --
> Martynas Mickevičius
> Typesafe – Reactive Apps on the JVM
>
> --
>>> Read the docs: http://akka.io/docs/
>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
> --
>>> Read the docs: http://akka.io/docs/
>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/FbpqhhBZfKY/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to ak

Re: [akka-user] Re: Distributed cache with Akka

2015-01-22 Thread Kane Rogers
Patrik, you can do no wrong.

On Tuesday, 20 January 2015 22:15:39 UTC+11, Patrik Nordwall wrote:
>
> Another alternative is to use akka-data-replication 
> .
>
> /Patrik
>
> On Tue, Jan 20, 2015 at 3:16 AM, Soumya Simanta  > wrote:
>
>> I would recommend using Redis based on personal experience. The current 
>> stable version is not distributed but the new 3.0 version that will be out 
>> in a few weeks supports clustering. 
>> https://groups.google.com/forum/#!topic/redis-db/_DqcFW8EAOA
>>
>> There are many Scala API for connecting to Redis. 
>>
>> Here are some other options. I personally don't have much experience with 
>> these. 
>> MapDB http://www.mapdb.org/
>> Hazelcast http://hazelcast.com/
>> Chronicle Map http://openhft.net 
>>
>>
>>
>> On Monday, January 19, 2015 at 8:42:09 PM UTC-5, as...@indexia.co wrote:
>>>
>>> Hey,
>>>
>>> I'm trying to build a small akka app that supports authentication via 
>>> tokens,
>>>
>>> What would be the simplest approach to store user tokens in a 
>>> distributed map cache for all akka nodes 
>>> as I can't guarantee that users will be served by the same akka node,
>>>
>>>
>>> Thanks.
>>>
>>  -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe  -  Reactive apps on the JVM
> Twitter: @patriknw
>
>  

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka streams] question on some time related use cases

2015-01-22 Thread Frank Sauer
Thanks for the pointers Endre,  I’ll explore those ideas.

Frank

> On Jan 22, 2015, at 4:02 AM, Endre Varga  wrote:
> 
> 
> 
> On Thu, Jan 22, 2015 at 5:07 AM, Frank Sauer  > wrote:
> Update, in a simple test scenario like so 
> 
>   val ticks = Source(1 second, 1 second, () => "Hello")
> 
>   val flow = ticks.transform(() => new FilterFor[String](10 seconds)(x => 
> true)).to(Sink.foreach(println(_)))
> 
>   flow.run()
> 
> I'm seeing the following error, so this doesn't work at all and I'm not sure 
> it is because of threading:
> 
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$currentOp(Interpreter.scala:175)
>   at 
> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.push(Interpreter.scala:209)
>   at 
> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.push(Interpreter.scala:278)
>   at 
> experiments.streams.time$FilterFor$$anonfun$1.apply$mcV$sp(time.scala:46)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> I think I'm violating the one very important rule mentioned in the docs - 
> when the timer fires it calls a push on the context but there is also a pull 
> going on concurrently(?) - and this is indeed breaking in spectacular ways as 
> expected 
> 
> :)
>  
> 
> I have no idea how to implement this correctly. It looked pretty simple at 
> first, but alas... 
> 
> See my previous mail. The main problem here is mixing backpressured streams 
> (your data) and non-backpressured events (timer triggers) in a safe fashion. 
> Well, the main problem is not how to implement it, but how to expose an API 
> to users which is as safe as possible. We have groupedWithin, takeWithin and 
> dropWithin as timer based ops, but no customization for now.
> 
> -Endre
>  
> 
> On Wednesday, January 21, 2015 at 8:51:21 PM UTC-5, Frank Sauer wrote:
> Thanks, I came up with the following, but I have some questions:
> 
> /**
>* Holds elements of type A for a given finite duration after a predicate p 
> first yields true and as long as subsequent
>* elements matching that first element (e.g. are equal) still satisfy the 
> predicate. If a matching element arrives during
>* the given FiniteDuration for which the predicate p does not hold, the 
> original element will NOT be pushed downstream.
>* Only when the timer expires and no matching elements have been seen for 
> which p does not hold, will elem be pushed
>* downstream.
>*
>* @param duration The polling interval during which p has to hold true
>* @param pThe predicate that has to remain true during the duration
>* @param system   implicit required to schedule timers
>* @tparam A   type of the elements
>*/
>   class FilterFor[A](duration : FiniteDuration)(p: A => Boolean)(implicit 
> system: ActorSystem) extends PushStage[A,A] {
> 
> var state : Map[A,Cancellable] = Map.empty
> 
> override def onPush(elem: A, ctx: Context[A]): Directive = 
> state.get(elem) match {
> 
>   case Some(timer) if !p(elem) => // pending timer but condition no 
> longer holds => cancel timer
>  timer.cancel()
>  state = state - elem
>  ctx.pull()
> 
>case None if p(elem) => // no pending timer and predicate true -> 
> start and cache new timer
>  val timer = system.scheduler.scheduleOnce(duration) {
>// when timer fires, remove from state and push elem downstream
>state = state - elem
>ctx.push(elem); // is this safe?
>  }
>  state = state + (elem -> timer)
>  ctx.pull()
> 
>case _ => ctx.pull() // otherwise simply wait for the next upstream 
> element
> }
> 
>   }
> 
> My main concerns are these:
> 
> 1) Is it safe to invoke ctx.push from the thread on which the timer fires?
> 2) How do I react to upstream or downstream finish or cancel events - do I 
> have to?
> 3) Can I integrate this into the DSL without using transform, e.g. can I 
> somehow add a filterFor method on something via a pimp my library?
> 
> Any and all pointers would be very much appreciated,
> 
> Thanks,
> 
> Frank
> 
> On Friday, January 16, 2015 at 11:52:03 AM UTC-5, Akka Team wrote:
> Hi Frank!
> We do not have such operations off-the-shelf, however they are easily 
> implementable by using custom stream processing stages:
> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/s

[akka-user] What is the best practice enforce Type Check for Actor during compilation

2015-01-22 Thread Sean Zhong
Suppose we have a servie actor:

class Service extends Actor {
  def receive: Receive = {
case RequestA(args) => doSomething()
case RequestB(args) => doAnotherThing()
  }
}

// Some client is using RequestA indirectly, like this:


class Source(request: RequestA)


A client is trying to sending RequestA to service
class Client(source: Source) {
  def query: Unit = {
service ? source.request
  }
}

Then one day, Service decide to NO longer support RequestA. 
The above code will still compile as the client doesn't know the Service 
has changed the interface, and will still send the invalid command. 

Do you have recommended coding practice to follow, or tools to use, so that 
it is easir to identity and track this kind of errors?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: How to identify that akka cluster has formed?

2015-01-22 Thread Ngoc Dao
You can try Glokka:
https://github.com/xitrum-framework/glokka

It let you name your actors in a cluster, then get the actors back by name.


On Monday, January 5, 2015 at 7:36:58 PM UTC+9, Krishna Kadam wrote:
>
> Hi patrik,
>>
> I am doing a masters project in which I have streaming data and want to 
> send a particular type of message to the same actor every time, so that 
> certain type of messages are processed by akka actors in their arrival 
> order. Also I want to deploy these actors on multiple machines for 
> execution purpose. Can I scale this process by deploying it to multiple 
> nodes? also Can I have better performance by deploying single actor system 
> to multiple nodes for execution?
>  
>
>> Thanks & Regards
>> Krishna Kadam
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] Junctions documentation and specs

2015-01-22 Thread Alexey Romanchuk
Hey!

Also there is not straightforward how backpressure works with Broadcast. 
What if one of the outputs is busy and another one is requesting for for 
new elements? As far as I understand from sources (Broadcast and 
outputBunch.AllOfMarkedOutputs) broadcast will emit new elements to 
downstream only when all substreams demand for new element. Is it correct? 
I think we should add this to docs.

Thanks!


среда, 21 января 2015 г., 9:01:00 UTC+6 пользователь Alexey Romanchuk 
написал:
>
> Hi again, hakkers!
>
> I can not find any documentation about conditions when juntions becomes 
> completed and how junctions propagate errors. Something like "Merge 
> completes when all of his input completes". Also there is no such cases in 
> unit tests. 
>
> Am I miss something? Sure this information can be found in sources, but I 
> think it should be documentation.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] ClusterClient load balancing

2015-01-22 Thread Hengky Sucanda
Hi Guys,

I'm currently developing an application using Play and Akka, and currently 
is deploying one frontend nodes with multiple backend nodes. The frontend 
node connects to the backend using ClusterClient. My question is, does 
ClusterClient have load balancing mechanism so it can properly distribute 
the message to other backend nodes? From what i have read in the docs, it 
says that ClusterClient will send it to a random destination in case of a 
multiple entries matching the path. Is there a way for me to load balance 
the frontend and backend message sending if i am using ClusterClient?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Cluster unreachable and a lot of cluster connections

2015-01-22 Thread Johannes Berg
Thanks for the answers, this really explains a lot. I will go back to my 
abyss and rethink some things. See below some answers/comments.

On Thursday, January 22, 2015 at 6:31:01 PM UTC+2, drewhk wrote:
>
> Hi Johannes,
>
> On Thu, Jan 22, 2015 at 4:53 PM, Johannes Berg  > wrote:
>
>>
>> I will try that but it seems that will only help to a certain point and 
>> when I push the load further it will hit it again.
>>
>
> There is no system message traffic between two Akka systems by default, to 
> have a system send system messages to another you either need to use remote 
> deployment or deathwatch on remote actors. Which one are you using? What is 
> the scenario?
>

I do use deathwatch on remote actors and the amount of deatchwatches I have 
is linear to the load I put on the system so that explains increased number 
of system messages based on load then I guess.
 

>
> The main issue is that whatever is the rate of system message delivery we 
> *cannot* backpressure remote deployment or how many watched remote actors 
> die. For any delivery buffer there is a large enough "mass actor extinction 
> event" that will just fill it up. You can increase the buffer size though 
> up to that point where you expect that a burst maximum is present (for 
> example you know the peak number of remote watched actors and the peak rate 
> of them dying).
>

Thinking about these features more closely I can see that these things may 
require acked delivery but I would have expected something that grows 
indefinately until outofmem like unbounded inboxes. It's not apparent from 
the documentation about deathwatching that you need to consider some buffer 
size if you are watching very many actors that may be created or die at a 
very fast rate (maybe a note about this could be added to the docs?). A 
quick glance at the feature you don't expect it to be limited by anything 
else than normal actor message sending and receiving. Furthermore I 
wouldn't have expected a buffer overflow due to deathwatching would cause a 
node to get quarantined and removed from the cluster, instead I would 
expect some deatchwatching to fail to work correctly. Causing the node to 
go down in case of a buffer overflow seems a bit dangerous considering ddos 
attacks even though it maybe makes the system behave more consistently.
 

>  
>
>>
>> I hit this within a minute after I put on the load which is a bit 
>> annoying to me. I'm fine with it becoming unreachable as long as I can get 
>> it back to reachable when it has crunched through the load. 
>>
>
> That means a higher buffer size. If there is no sysmsg buffer size that 
> can absorb your load then you have to rethink your remote deployment/watch 
> strategy (whichever feature you use).
>

Now that I know what's causing the increasing rate of system messages I 
certainly will rethink my deatchwatch stategy and/or limiting the load 
based on the configured buffer size.
 

>  
>
>> Will it still buffer up system messages even though it's unreachable? 
>>
>
> After quarantine there is no system message delivery, everything is 
> dropped. There is no recovery from quarantine that is its purpose. If there 
> is any lost system message between two systems (and here they are dropped 
> due to the buffer being full) then they are in an undefined state, 
> especially with remote deployment, so they quarantine each other. 
>

After quarantine I understand there's no system message delivery, but when 
it's just unreachable it buffers them up, right? I think there should be a 
note about this in the documentation about deathwatching and remote 
deployment what buffer may need tweaking and what can happen if the buffer 
is overflown.
 

>  
>
>> At what rate are system messages typically sent?
>>
>
> They are sent at the rate you are remote deploying or watching actors 
> remotely or at the rate remote watched actors die. On the wire it depends, 
> and user messages share the same TCP connection with the system messages 
> which might also reduce available throughput. 
>
> You can tune the dispather of remoting by adding more threads to it, you 
> might also increase the netty threadpool: 
> http://doc.akka.io/docs/akka/2.3.9/general/configuration.html#akka-remote
>
> You might want to set the system-message-ack-piggyback-timeout setting to 
> a lower value, like 100ms.
>  
> -Endre
>
>
>> As it is now it's easy to take down the system before you have any chance 
>> of spinning up new nodes.
>>
>>
>> On Thursday, January 22, 2015 at 5:32:33 PM UTC+2, Patrik Nordwall wrote:
>>>
>>> You can try to increase akka.remote.system-message-buffer-size config 
>>> setting. Default is 1000.
>>> /Patrik
>>>
>>> On Thu, Jan 22, 2015 at 3:41 PM, Johannes Berg  
>>> wrote:
>>>
 Thanks for the tip for what to look for, my logs are huge so it's a bit 
 of a jungle. Anyway I found this:

 10:34:23.701UTC ERROR[system-akka.actor.default-dispatcher-2] Remoting 
 - Association to [akka.tcp://system@ip2

[akka-user] Can not get distributed pub-sub extension to work...

2015-01-22 Thread Thiago Souza
Hello all,

I'm getting a hard time trying to get the samples 
at http://doc.akka.io/docs/akka/snapshot/contrib/distributed-pub-sub.html 
to work.

I've implemented the Subscriber and the Publisher as described in the 
article and created 2 systems with the following configurations:

*seed ActorSystem (where Publisher is registered):*
*akka {*
*  extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]*
*  actor {*
*provider = "akka.cluster.ClusterActorRefProvider"*
*  }*
*  remote {*
*log-remote-lifecycle-events = on*
*netty.tcp {*
*  hostname = "127.0.0.1"*
*  port = 2551*
*}*
*  }*
*  cluster {*
*seed-nodes = ["akka.tcp://ClusterSystem@127.0.0.1:2551"]*
*auto-down-unreachable-after = 10s*
*  }*
*}*

*worker ActorSystem (where Subscriber is registered):*
*akka {*
*  extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]*
*  actor {*
*provider = "akka.cluster.ClusterActorRefProvider"*
*  }*
*  remote {*
*log-remote-lifecycle-events = on*
*netty.tcp {*
*  hostname = "127.0.0.1"*
*  port = 0*
*}*
*  }*
*  cluster {*
*seed-nodes = ["akka.tcp://ClusterSystem@127.0.0.1:2551"]*
*auto-down-unreachable-after = 10s*
*  }*
*}*

But after sending "hello world" to Publisher nothing is printed. Here 
is the full log:

*[INFO] [01/22/2015 20:43:59.631] [main] [Remoting] Starting remoting*
*[INFO] [01/22/2015 20:43:59.771] [main] [Remoting] Remoting started; 
listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]*
*[INFO] [01/22/2015 20:43:59.772] [main] [Remoting] Remoting now listens on 
addresses: [akka.tcp://ClusterSystem@127.0.0.1:2551]*
*[INFO] [01/22/2015 20:43:59.782] [main] [Cluster(akka://ClusterSystem)] 
Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Starting up...*
*[INFO] [01/22/2015 20:43:59.847] [main] [Cluster(akka://ClusterSystem)] 
Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Registered cluster 
JMX MBean [akka:type=Cluster]*
*[INFO] [01/22/2015 20:43:59.847] [main] [Cluster(akka://ClusterSystem)] 
Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Started up 
successfully*
*[INFO] [01/22/2015 20:43:59.850] 
[ClusterSystem-akka.actor.default-dispatcher-4] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:2551] - Metrics will be retreived from 
MBeans, and may be incorrect on some platforms. To increase metric accuracy 
add the 'sigar.jar' to the classpath and the appropriate platform-specific 
native libary to 'java.library.path'. Reason: 
java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar*
*[INFO] [01/22/2015 20:43:59.851] 
[ClusterSystem-akka.actor.default-dispatcher-4] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:2551] - Metrics collection has started 
successfully*
*[INFO] [01/22/2015 20:43:59.859] 
[ClusterSystem-akka.actor.default-dispatcher-14] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:2551] - Node 
[akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []*
*[INFO] [01/22/2015 20:43:59.881] [main] [Remoting] Starting remoting*
*[INFO] [01/22/2015 20:43:59.888] [main] [Remoting] Remoting started; 
listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:54670]*
*[INFO] [01/22/2015 20:43:59.888] [main] [Remoting] Remoting now listens on 
addresses: [akka.tcp://ClusterSystem@127.0.0.1:54670]*
*[INFO] [01/22/2015 20:43:59.889] [main] [Cluster(akka://ClusterSystem)] 
Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:54670] - Starting up...*
*[INFO] [01/22/2015 20:43:59.891] [main] [Cluster(akka://ClusterSystem)] 
Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:54670] - Started up 
successfully*
*[INFO] [01/22/2015 20:43:59.892] 
[ClusterSystem-akka.actor.default-dispatcher-4] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:54670] - Metrics will be retreived from 
MBeans, and may be incorrect on some platforms. To increase metric accuracy 
add the 'sigar.jar' to the classpath and the appropriate platform-specific 
native libary to 'java.library.path'. Reason: 
java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar*
*[INFO] [01/22/2015 20:43:59.892] 
[ClusterSystem-akka.actor.default-dispatcher-4] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:54670] - Metrics collection has started 
successfully*
*[INFO] [01/22/2015 20:43:59.904] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[akka://ClusterSystem/user/subscriber1] subscribing*
*[INFO] [01/22/2015 20:44:00.048] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:2551] - Node 
[akka.tcp://ClusterSystem@127.0.0.1:54670] is JOINING, roles []*
*[INFO] [01/22/2015 20:44:00.165] 
[ClusterSystem-akka.actor.default-dispatcher-14] 
[Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:54670] - Welcome from 
[akka.tcp://Clus