Re: [akka-user] [Streams] Are there any limits on Stream merging?
Hi Jakub, On Tue, Mar 3, 2015 at 1:07 PM, Jakub Liska liska.ja...@gmail.com wrote: Hey, I'm trying to design a stream processing of hundreds of thousands of files row by row, reading files lazily. It comes with the obligation to close the InputStream at the end so that creating an ActorPublisher for each file that would close the underlying stream at the end seems to be the best idea. But the streams must be merged into a single one. My question is, can I do something like this for hundreds of thousands of files? Or is it a bad idea? I can't think of anything else right now. Thank you Source[Row]() { implicit b = val actorSources = files.map( file = Source(Props(classOf[BatchActor], file)) ).toArray val merge = b.add(Merge[Row](actorSources.length)) for (i - 0 until actorSources.length) { b.addEdge(b.add(actorSources(i)), merge.in(i)) } merge.out } It will probably work, but I am not sure about how performant it will be. I think you can try to merge in multiple stages instead of just using one Merge node (i.e. build a tree of merges). It might be faster that way. -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Streams M4: Reasons behind the new MAT type parameter/why does .to default to Keep.left?
Hi Giovanni, 1) The type needs to exist somewhere, and if your code doesn't care about it, you can accept a Source[T, _] 2) For consistency, one of the sides where chosen, and it's not that uncomon that you are interested in the materialized value of the Source. If you are interested in the materialized value of the Sink, you should add the Sink with runWith instead. A more complete overview is available here. B/ On 2 March 2015 at 22:54:53, Giovanni Alberto Caporaletti (paradi...@gmail.com) wrote: I'm trying to better understand the new api. If I got it right, the new type parameter of the various stream components represents the type of what's materialized by that component when the stream is run (materialized). E.g.: A Source(actorProducerProps) has the actorRef as its materialization. Now I have two questions: 1) In which cases could it be useful to know the materialization type of intermediate components? I mean, as a stream consumer (i.e. client, in a generic sense), if someone gives me a Source[T,M], I should only know that it produces T elements, the M type looks like implementation detail. What am I missing? 2) Why does .to default to Keep.left? Aren't we almost always interested in the materialized result of Sinks (right)? I see a lot of [T, Unit] around. Thanks! G -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka-persistence: persistAsync interaction with snapshots
Hi, Sorry in advance if this has already been answered somewhere, I couldn't find it... I have a PersistentActor, and I use persistAsync to persist its events. Now I want to add snapshots into the mix. So from what I understand from the documentation, having a snapshot with a timestamp T means I wouldn't get any events that were persisted prior to T. But when are these timestamps generated? Is it possible that I will miss events? Is it possible that an event that was already taken into account in the snapshot be processed again? Thanks, Tal -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Questions prior to attempting to implement a persistence snapshot
Hi Tim, I guess you are talking about implementing your own SnapshotStore. 1) If you are trying to implement your own, then you can do whatever you like below the surface of the SnapshotStore API, but you can't (yet) have multiple snapshot stores (https://github.com/akka/akka/issues/15587) which I guess you know, since you opened that ticket. 2) I'm not sure that I follow what you're asking here. How does that relate to the SnapshotStore? The loadAsync method will get called when a persisted actor that has support for snapshots is recovered. Whatever your SnapshotStore decides to do send back is up to you. If there is no snapshot saved, you can fabricate one if that suits your use case. 3) Inside a SnapshotStore, you're in full control of how you serialize and save your data. No magic is done for you. B/ On 1 March 2015 at 12:14:31, Tim Pigden (tim.pig...@optrak.com) wrote: Hi, Some questions prior to implementing a snapshot 1. Can I have different snapshot mechanisms for different data objects? The interface seems to to allow it but I'm not certain. 2. I'm pretty sure the answer to this one is yes but I thought I'd check - can I instantiate a new object with a snapshot? That is my initial complex data object comes from somewhere else and the first thing I need to do is snapshot. Cmds are meaningless without incoming data object. I presume I can sort this out during construction. 3. If I write a custom snapshot, do I need to bother about the akka serialization on my object? Does my snapshot plugin get given my object or is the serialization somehow invoked prior to the saveAsync method being called? I've already got my own serialisation mechanism. Thanks Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Shutting down an akka stream
Hi, If you're on 1.0-M4, have you looked at runWith on Flow that takes both a Source and a Sink and give you a Tuple of the materialized values? B/ On 1 March 2015 at 23:23:58, Jelmer Kuperus (jkupe...@gmail.com) wrote: Suppose you have a akka stream backed by an ActorPublisher that listens to a continuous stream of data from a messaging system Then how do you cancel this stream ? When you create the source you pass in the props so you don't have access to the actorref. So you can say, send it a Cancel message Constructing the stream will give you a future and not a Cancellable I am looking for a way to gracefully shut down without losing any messages -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] ClusterSharding and node address/port
On Tue, Mar 3, 2015 at 11:37 AM, Björn Antonsson bjorn.antons...@typesafe.com wrote: Hi Brice. I just noticed the other discussion. You are right, the Region that is persisted contains an actor ref. and that serialized ref contains full address information and uid, so eventually it will be removed by the watch B/ On 3 March 2015 at 10:44:51, Björn Antonsson (bjorn.antons...@typesafe.com) wrote: Hi Brice, Are you sure that it is the Sharding that is the issue and not something in the messages that you send to the sharded actors? As far as I can see, the sharding itself only persists string IDs of the entities and if you don't include any address specific information in there or persist actor refs in your sharded entries, you should be fine. B/ On 27 February 2015 at 16:16:36, Brice Figureau (brice...@daysofwonder.com) wrote: Hi, I was experimenting with the ClusterSharding system (the one in master with rememberEntries true) and persistence in a simplistic test that those the following: 1) Start a ClusterSharding region on one node only 2) start an entry, and communicate with it 3) shutdown the ActorSystem 4) Start a new ActorSystem (same name) 5) Start the same ClusterSharding region 6) expect the entry to be recreated by virtue of persistence and 'rememberEntries' To my surprise it failed. The reason is that I add netty.tcp.port=0 in my configuration, which attributes a random port for each ActorSystem. Of course, in 4) the new ActorSystem has a distinct port from the one created at the start of the test. This means that when replaying the ShardCoordinator events, the unserialized ActorRef points to a different node than the current one, which later will create some issues since it doesn't exist anymore. Now, the problem is not this simplistic test (that I can fix by assigning a proper fixed port). It's when this system will be used in a production cluster where there's no guarantee that there will always be the same node present between restarts of the ShardCoordinator. For instance if I completely shutdown the production cluster, and recreate a new one on different EC2 instances (for instance) that would have completely different IP address, then the ShardCoordinator wouldn't be able to properly be configured. Is it a bug or did I miss something? -- Brice Figureau brice...@daysofwonder.com -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to
Re: [akka-user] ClusterSharding and node address/port
Hi Brice, Are you sure that it is the Sharding that is the issue and not something in the messages that you send to the sharded actors? As far as I can see, the sharding itself only persists string IDs of the entities and if you don't include any address specific information in there or persist actor refs in your sharded entries, you should be fine. B/ On 27 February 2015 at 16:16:36, Brice Figureau (brice...@daysofwonder.com) wrote: Hi, I was experimenting with the ClusterSharding system (the one in master with rememberEntries true) and persistence in a simplistic test that those the following: 1) Start a ClusterSharding region on one node only 2) start an entry, and communicate with it 3) shutdown the ActorSystem 4) Start a new ActorSystem (same name) 5) Start the same ClusterSharding region 6) expect the entry to be recreated by virtue of persistence and 'rememberEntries' To my surprise it failed. The reason is that I add netty.tcp.port=0 in my configuration, which attributes a random port for each ActorSystem. Of course, in 4) the new ActorSystem has a distinct port from the one created at the start of the test. This means that when replaying the ShardCoordinator events, the unserialized ActorRef points to a different node than the current one, which later will create some issues since it doesn't exist anymore. Now, the problem is not this simplistic test (that I can fix by assigning a proper fixed port). It's when this system will be used in a production cluster where there's no guarantee that there will always be the same node present between restarts of the ShardCoordinator. For instance if I completely shutdown the production cluster, and recreate a new one on different EC2 instances (for instance) that would have completely different IP address, then the ShardCoordinator wouldn't be able to properly be configured. Is it a bug or did I miss something? -- Brice Figureau brice...@daysofwonder.com -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [Streams 1.0-M4] From Source[Future[T], Unit] to Source[T, Unit]?
Thanks Jim. W dniu wtorek, 3 marca 2015 01:29:04 UTC+1 użytkownik Jim Hazen napisał: I think the answer is in the {Scala, Java}Docs of mapAsync: Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a Future and the value of that future will be emitted downstreams. As many futures as requested elements by downstream may run in parallel and may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream. The section on rate goes into greater detail: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-rate.html You can either go with the default demand as specified by your Materialization settings, use the default settings (default values in reference.conf), or manually specify a buffer downstream from your source to have greater control over buffering and generated demand. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Questions prior to attempting to implement a persistence snapshot
1) embarrassed cough. I kind had a feeling you couldn't do it but I completely forgot I reported it! Thanks! Tim On 3 March 2015 at 15:52, Björn Antonsson bjorn.antons...@typesafe.com wrote: Hi Tim, I guess you are talking about implementing your own SnapshotStore. 1) If you are trying to implement your own, then you can do whatever you like below the surface of the SnapshotStore API, but you can't (yet) have multiple snapshot stores (https://github.com/akka/akka/issues/15587) which I guess you know, since you opened that ticket. 2) I'm not sure that I follow what you're asking here. How does that relate to the SnapshotStore? The loadAsync method will get called when a persisted actor that has support for snapshots is recovered. Whatever your SnapshotStore decides to do send back is up to you. If there is no snapshot saved, you can fabricate one if that suits your use case. 3) Inside a SnapshotStore, you're in full control of how you serialize and save your data. No magic is done for you. B/ On 1 March 2015 at 12:14:31, Tim Pigden (tim.pig...@optrak.com) wrote: Hi, Some questions prior to implementing a snapshot 1. Can I have different snapshot mechanisms for different data objects? The interface seems to to allow it but I'm not certain. 2. I'm pretty sure the answer to this one is yes but I thought I'd check - can I instantiate a new object with a snapshot? That is my initial complex data object comes from somewhere else and the first thing I need to do is snapshot. Cmds are meaningless without incoming data object. I presume I can sort this out during construction. 3. If I write a custom snapshot, do I need to bother about the akka serialization on my object? Does my snapshot plugin get given my object or is the serialization somehow invoked prior to the saveAsync method being called? I've already got my own serialisation mechanism. Thanks Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to a topic in the Google Groups Akka User List group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/fXa9r0m8txE/unsubscribe. To unsubscribe from this group and all its topics, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Tim Pigden Optrak Distribution Software Limited +44 (0)1992 517100 http://www.linkedin.com/in/timpigden http://optrak.com Optrak Distribution Software Ltd is a limited company registered in England and Wales. Company Registration No. 2327613 Registered Offices: Suite 6,The Maltings, Hoe Lane, Ware, SG12 9LR England This email and any attachments to it may be confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily represent those of Optrak Distribution Software Ltd. If you are not the intended recipient of this email, you must neither take any action based upon its contents, nor copy or show it to anyone. Please contact the sender if you believe you have received this email in error. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit
Re: [akka-user] [Streams 1.0-M4] Using Balance to partition to independent sinks
Hi, First of, there is no support for serializing parts of a graph and sending them to other nodes for execution in akka-streams. That would be awesome to have, but it's not there yet. Second, your shape in the graph is no longer a UniformFanOutShape, since you have connected a Source to the Balance, and thus removed the inlet on the graph you are building. Then you return the balance which hasn't got the same shape as the thing you are building. Have you looked at the cookbook samples of how to balance out processing to a number f workers? B/ On 2 March 2015 at 23:06:52, rmarsch (rmarsc...@localytics.com) wrote: Hi, I have a use case that I believe Akka Streams may be well suited for, but I'm not clearly seeing how to implement part of it from reading the documentation. I have a single, finite input source from an iterator stream. I want to use Balance to split this stream into N partitions and then be able to feed these stream partitions each into a RunnableFlow that would be executed on its own node in a distributed system. in ~ balance.out(0) ~ sink0 balance.out(1) ~ sink1 ... balance.out(n) ~ sinkN I have some rough code sketched together, but I think I'm missing some concepts to tie everything together. Runtime complains about the inlet for the UniformInOutShape which I'm assuming is what I have in the inputGraph. Any nudge in the right direction would be appreciated: val iteratorBuilder: () = Iterator[T] = ??? val inputGraph: Graph[UniformFanOutShape[T,T], Unit] = FlowGraph.partial() { implicit builder : FlowGraph.Builder = import FlowGraph.Implicits._ val stream: SourceShape[T] = builder.add(Source(iteratorBuilder)) val balance = builder.add(Balance[T](numPartitions)) stream ~ balance balance } partitions = (0 until numPartitions).map(i = { val sink : Sink[T, Future[Stream[T]]] = Sink.fold(Stream.empty[T]) { (v, e) = v.+:(e) } val outFlow: RunnableFlow[Future[Stream[T]]] = FlowGraph.closed(sink) { implicit builder : FlowGraph.Builder = sink = import FlowGraph.Implicits._ val balance = builder.add(inputGraph) balance.out(i) ~ sink } new Partition(i, outFlow) }).toArray -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Streaming large file upload using akka-http
Hi Soumya, Yes, that can definitely be achieved. I've pulled some code out from a test and put it in a gist here https://gist.github.com/bantonsson/881f831db93ec474f9bd There is a fair ammount of scaffolding, including creating a binary blob file and uploading it to the server via curl. The real action is in the methods loadSimpleForm and readBlob. The code for setting up the server is in withHttpServer. Hope this helps. B/ On 28 February 2015 at 01:35:42, Soumya Simanta (soumya.sima...@gmail.com) wrote: I've a very large file (few GBs) that I want to stream into my application using a browser-based interface (HTTP) One possible solution is to use Play with Iteratee. I was wondering if that can be achieved with current implementation of akka-streams/akka-http? If someone can point to a recipe to get started with this I would appreciate that. Thanks -Soumya -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Help Understanding Benefits of Asynchronous/NIO Architecture
Sorry if perhaps too general for this group but I've been struggling with somewhere appropriate to ask. I have done quite a bit of reading but haven't found anywhere yet that answers my questions in a way my small brain can understand. I understand what Asynchronous and Non-Blocking I/O means, *what I am trying to understand is when I can expect to see improvements in performance by adopting an Asynchronous and NIO framework such as Akka.* *I have the following assumptions - it would be great if anyone could agree/disagree (Assume a Request/Response in a Web Application that has some blocking calls in the business tier):* 1. Even though some actors will process their messages and quickly delegate to other child actors asynchronously, there will be some child actors that will eventually have to deal with blocking I/O at the boundaries of an application right? These actors will still be running in a thread which will block for the IO right? Such Actors would (at high level) be equivalent to the Workers in languages such as Node right? 2. In an asynchronous architecture, I'm assuming that I will only see improvements in the speed of a request/response round trip if there are several transactions (as part of the request) that I can now parallize (assuming multi-cores available). *ie. Lets say I have the following scenarios for my request/response:* *1 Single blocking call to do * - Even though messages used to deal with this transaction will largely be passed around the system asynchronously, I will likely not see an improvement in speed of the request/response round trip since the single piece of I/O work will still take as long as it takes right? *Multiple Blocking calls to do that I can run in parallel * - if (in servicing the request) I have several pieces of blocking work to do (maybe 1 web service call, 1 DB transaction, 1 cache call) AND I can run them in parallel then I will likely see an improvement in speed of the round trip based on the divide and conquer approach of the I/O stuff *Multiple Blocking calls to do that I CANNOT run in parallel * - if the several pieces of blocking work need to run in sequence then I am still unlikely to see an improvement in speed since I cannot parallelize the I/O right? Many thanks James -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka http 1.0-M2 thread pegs a CPU core until connection closes on Windows
I just ran into this issue. What's the drawback of setting akka.io.tcp.windows-connection-abort-workaround-enabled = off? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [Streams 1.0-M4] Compatibility with Akka-2.4-SNAPSHOT?
Is there an ETA for akka-http on 2.4? According to Mathias, Spray won't be ported to 2.4: https://groups.google.com/d/msg/spray-user/x0KdMn_7exE/B8Rp2xuSa2sJ and according to you akka-http also isn't yet ready for 2.4. I'd like to develop a REST service that takes advantage of cluster sharding with rememberEntries support and that's somewhat difficult at the moment. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [Streams 1.0-M4] Using Balance to partition to independent sinks
Thank you for your response. I have to admit the use-case I was trying for is unusual and it was dependent on being able to serialize parts of the graph to other nodes. I did see the nice cookbook sample of balancing out processing to a number of workers, but in my case there would be no merge phase back to one output. The main goal I was trying to achieve was to distribute a finite stream of data out to a collection of Spark partitions without having to first store the entire stream data set in memory on the master. On Tuesday, March 3, 2015 at 11:45:20 AM UTC-5, Björn Antonsson wrote: Hi, First of, there is no support for serializing parts of a graph and sending them to other nodes for execution in akka-streams. That would be awesome to have, but it's not there yet. Second, your shape in the graph is no longer a UniformFanOutShape, since you have connected a Source to the Balance, and thus removed the inlet on the graph you are building. Then you return the balance which hasn't got the same shape as the thing you are building. Have you looked at the cookbook http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.htmlsamples of how to balance out processing to a number f workers? B/ On 2 March 2015 at 23:06:52, rmarsch (rmar...@localytics.com javascript:) wrote: Hi, I have a use case that I believe Akka Streams may be well suited for, but I'm not clearly seeing how to implement part of it from reading the documentation. I have a single, finite input source from an iterator stream. I want to use Balance to split this stream into N partitions and then be able to feed these stream partitions each into a RunnableFlow that would be executed on its own node in a distributed system. in ~ balance.out(0) ~ sink0 balance.out(1) ~ sink1 ... balance.out(n) ~ sinkN I have some rough code sketched together, but I think I'm missing some concepts to tie everything together. Runtime complains about the inlet for the UniformInOutShape which I'm assuming is what I have in the inputGraph. Any nudge in the right direction would be appreciated: val iteratorBuilder: () = Iterator[T] = ??? val inputGraph: Graph[UniformFanOutShape[T,T], Unit] = FlowGraph.partial() { implicit builder : FlowGraph.Builder = import FlowGraph.Implicits._ val stream: SourceShape[T] = builder.add(Source(iteratorBuilder)) val balance = builder.add(Balance[T](numPartitions)) stream ~ balance balance } partitions = (0 until numPartitions).map(i = { val sink : Sink[T, Future[Stream[T]]] = Sink.fold(Stream.empty[T]) { (v, e) = v.+:(e) } val outFlow: RunnableFlow[Future[Stream[T]]] = FlowGraph.closed(sink) { implicit builder : FlowGraph.Builder = sink = import FlowGraph.Implicits._ val balance = builder.add(inputGraph) balance.out(i) ~ sink } new Partition(i, outFlow) }).toArray -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] GSOC(2015)
Hi Adam, I’m sorry to say that we currently have no plans to participate in the Google Summer of Code. Regards, Roland 2 mar 2015 kl. 22:49 skrev adam kozuch adam.koz...@gmail.com: Hello, I would like to ask if anyone from Akka Team would like to be Google Summer of Code mentor this year? Cheers, Adam Kozuch -- Read the docs: http://akka.io/docs/ http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com mailto:akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com mailto:akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout https://groups.google.com/d/optout. Dr. Roland Kuhn Akka Tech Lead Typesafe http://typesafe.com/ – Reactive apps on the JVM. twitter: @rolandkuhn http://twitter.com/#!/rolandkuhn -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka http 1.0-M2 thread pegs a CPU core until connection closes on Windows
Hi Jim, Here is the original mailing list discussion https://groups.google.com/forum/#!topic/akka-user/WdXWjcnVWiQ and the corresponding ticket https://github.com/akka/akka/issues/15766 In short when you set it to off on windows, and a client aborts the server might in some cases not notice that the connection is closed until it tries to write to it. B/ On 3 March 2015 at 21:14:15, Jim Newsham (jim.news...@gmail.com) wrote: I just ran into this issue. What's the drawback of setting akka.io.tcp.windows-connection-abort-workaround-enabled = off? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [Streams] Is supervision strategy supposed to work with ActorPublishers?
Hey, when ActorPublisher does : onError(exceptionRegisteredInSupervisionDecider) then the stream just fails with that exception. Supervision strategy doesn't work here. Is it supposed to or it won't work for ActorPublishers? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams 1.0-m4 load balancer creation stuck
Hi Luis, It should not stuck but throw, but this will not work: broadcast.out(i) ~ worker ~ merge.in(i) You imported worker once, you cannot use it N times. You can either use builder.add to add as many times as you need (the parametric import you used only matters if you want to expose the materialized value of the imported element), but since your worker is a Flow, why don't just accept it as a Flow instead of Graph[FlowShape[A, B], Unit]? It is interesting why it hangs though instead of throwing an exception. -Endre With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Shutting down an akka stream
Hi Björn, I am not sure if i understand you. The piece of software i am created listens to messages posted to topics, when a message arrives we perform some operation (call an external system). The flow of messages will never stop. New ones will keep coming in. But we still want to bring down a node. When we do we want to first stop the producer from reading messages of a topic and then wait until there are no messages left in flight so we can safely shutdown Op dinsdag 3 maart 2015 12:12:06 UTC+1 schreef Björn Antonsson: Hi, If you're on 1.0-M4, have you looked at runWith on Flow that takes both a Source and a Sink and give you a Tuple of the materialized values? B/ On 1 March 2015 at 23:23:58, Jelmer Kuperus (jkup...@gmail.com javascript:) wrote: Suppose you have a akka stream backed by an ActorPublisher that listens to a continuous stream of data from a messaging system Then how do you cancel this stream ? When you create the source you pass in the props so you don't have access to the actorref. So you can say, send it a Cancel message Constructing the stream will give you a future and not a Cancellable I am looking for a way to gracefully shut down without losing any messages -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams 1.0-m4 load balancer creation stuck
Adding as many worker instances as I need using builder.add works but one caveat... this works: object Balancer { def apply[A, B, C](nrOfWorkers: Int, workerGraph: Graph[FlowShape[A, B], Unit]): Graph[FlowShape[A, B], Unit] = FlowGraph.partial() { implicit builder = import FlowGraph.Implicits._ val broadcast = builder.add(Broadcast[A](nrOfWorkers)) val merge = builder.add(Merge[B](nrOfWorkers)) val workers = (0 until nrOfWorkers).map(_ = builder.add(workerGraph)) for (i - 0 until nrOfWorkers) { broadcast ~ workers(i) ~ merge } FlowShape(broadcast.in, merge.out) } } But this doesn't work... object Balancer { def apply[A, B, C](nrOfWorkers: Int, workerGraph: Graph[FlowShape[A, B], Unit]): Graph[FlowShape[A, B], Unit] = FlowGraph.partial() { implicit builder = import FlowGraph.Implicits._ val broadcast = builder.add(Broadcast[A](nrOfWorkers)) val merge = builder.add(Merge[B](nrOfWorkers)) for (i - 0 until nrOfWorkers) { broadcast ~ builder.add(workerGraph) ~ merge } FlowShape(broadcast.in, merge.out) } } And the error I get is: java.lang.IllegalArgumentException: requirement failed: The inlets [UniformFanOut.in] and outlets [UniformFanIn.out] must correspond to the inlets [UniformFanOut.in, UniformFanOut.in] and outlets [UniformFanIn.out, UniformFanIn.out]) 2015-03-03 14:08 GMT+00:00 Luis Ángel Vicente Sánchez langel.gro...@gmail.com: Thank you Endre! Yes, I have seen that example. In my use case, workers are also partial graphs and I will use builder.add to import the flow as many times as I need and let you know if that did the trick. If I found any other problem, I will simplify my use case so I can just use Flows. 2015-03-03 14:03 GMT+00:00 Endre Varga endre.va...@typesafe.com: Btw, have you looked at the actual cookbook sample? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers On Tue, Mar 3, 2015 at 2:56 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Luis, It should not stuck but throw, but this will not work: broadcast.out(i) ~ worker ~ merge.in(i) You imported worker once, you cannot use it N times. You can either use builder.add to add as many times as you need (the parametric import you used only matters if you want to expose the materialized value of the imported element), but since your worker is a Flow, why don't just accept it as a Flow instead of Graph[FlowShape[A, B], Unit]? It is interesting why it hangs though instead of throwing an exception. -Endre With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka-streams 1.0-m4 load balancer creation stuck
I'm trying to create a generic load balancer and the code looks like this: import akka.stream.scaladsl.{ Merge, Broadcast, FlowGraph } import akka.stream.{ FlowShape, Graph } object Balancer { def apply[A, B, C](nrOfWorkers: Int, workerGraph: Graph[FlowShape[A, B], Unit]): Graph[FlowShape[A, B], Unit] = FlowGraph.partial(workerGraph) { implicit builder = worker = import FlowGraph.Implicits._ val broadcast = builder.add(Broadcast[A](nrOfWorkers)) val merge = builder.add(Merge[B](nrOfWorkers)) for (i - 0 until nrOfWorkers) { broadcast.out(i) ~ worker ~ merge.in(i) } FlowShape(broadcast.in, merge.out) } } With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Shutting down an akka stream
Hi, I thought your original question was about getting hold of the ActorRef of your ActorProducer so you could communicate with it, and the way to get the materialized value of both the Source and the Sink is to use runWith on the Flow, and pass in the Source and the Sink. Here is some pseudocode val (actorRef, whatever) = flow.runWith(actorProducerSource, whateverSink) actorRef ! Cancel B/ On 3 March 2015 at 14:59:07, Jelmer Kuperus (jkupe...@gmail.com) wrote: Hi Björn, I am not sure if i understand you. The piece of software i am created listens to messages posted to topics, when a message arrives we perform some operation (call an external system). The flow of messages will never stop. New ones will keep coming in. But we still want to bring down a node. When we do we want to first stop the producer from reading messages of a topic and then wait until there are no messages left in flight so we can safely shutdown Op dinsdag 3 maart 2015 12:12:06 UTC+1 schreef Björn Antonsson: Hi, If you're on 1.0-M4, have you looked at runWith on Flow that takes both a Source and a Sink and give you a Tuple of the materialized values? B/ On 1 March 2015 at 23:23:58, Jelmer Kuperus (jkup...@gmail.com) wrote: Suppose you have a akka stream backed by an ActorPublisher that listens to a continuous stream of data from a messaging system Then how do you cancel this stream ? When you create the source you pass in the props so you don't have access to the actorref. So you can say, send it a Cancel message Constructing the stream will give you a future and not a Cancellable I am looking for a way to gracefully shut down without losing any messages -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams 1.0-m4 load balancer creation stuck
Btw, have you looked at the actual cookbook sample? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers On Tue, Mar 3, 2015 at 2:56 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Luis, It should not stuck but throw, but this will not work: broadcast.out(i) ~ worker ~ merge.in(i) You imported worker once, you cannot use it N times. You can either use builder.add to add as many times as you need (the parametric import you used only matters if you want to expose the materialized value of the imported element), but since your worker is a Flow, why don't just accept it as a Flow instead of Graph[FlowShape[A, B], Unit]? It is interesting why it hangs though instead of throwing an exception. -Endre With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Problem with supervision strategy akka-stream 1.0-M4
Hi, I've looked at this, and it actually looks like a bug. I've opened a ticket here https://github.com/akka/akka/issues/16982 Thanks for reporting this. B/ On 28 February 2015 at 12:28:07, Igor Perevozchikov (iperevozchi...@gmail.com) wrote: Hello! I'm very inspired of Reactive streams idea and akka implementation. But i found some discourage fact in 1.0-M4: Why is the stream supervisor works only for one exception and other exceptions ignores? Example1 with one bad element in the stream: val decider: Supervision.Decider = exception = exception match { case _: ArithmeticException = Supervision.Resume case _: NumberFormatException = Supervision.Resume case _ = Supervision.Stop } implicit val system = ActorSystem(system) implicit val materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(system) .withSupervisionStrategy(decider) ) val source = Source(Vector(1a, 2, 3, 4)).map( _.toInt ) val result = source.runWith(Sink.foreach(i = println(i))) in this example, everything was fine and console output was 2,3,4 Example2 with more than one bad elements in the stream: val decider: Supervision.Decider = exception = exception match { case _: ArithmeticException = Supervision.Resume case _: NumberFormatException = Supervision.Resume case _ = Supervision.Stop } implicit val system = ActorSystem(system) implicit val materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(system) .withSupervisionStrategy(decider) ) val source = Source(Vector(1a, 2a, 3a, 2, 3, 4)).map( _.toInt ) val result = source.runWith(Sink.foreach(i = println(i))) in this exampe, stream raised error: 14:11:24.922 [system-akka.actor.default-dispatcher-6] ERROR akka.actor.OneForOneStrategy - For input string: 2a java.lang.NumberFormatException: For input string: 2a at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.7.0_75] at java.lang.Integer.parseInt(Integer.java:492) ~[na:1.7.0_75] at java.lang.Integer.parseInt(Integer.java:527) ~[na:1.7.0_75] at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:247) ~[scala-library-2.11.5.jar:na] at scala.collection.immutable.StringOps.toInt(StringOps.scala:30) ~[scala-library-2.11.5.jar:na] at Application$$anonfun$3.apply(Application.scala:23) ~[classes/:na] at Application$$anonfun$3.apply(Application.scala:23) ~[classes/:na] at akka.stream.impl.fusing.Map.onPush(Ops.scala:16) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:296) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:206) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:294) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:416) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$6.push(Interpreter.scala:498) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:137) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.5.jar:na] at akka.stream.impl.SubReceive.apply(Transfer.scala:16) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.stream.impl.SubReceive.apply(Transfer.scala:12) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.5.jar:na] at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[scala-library-2.11.5.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.9.jar:na] at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:282) ~[akka-stream-experimental_2.11-1.0-M4.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.11-2.3.9.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.11-2.3.9.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) [akka-actor_2.11-2.3.9.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:221) [akka-actor_2.11-2.3.9.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:231) [akka-actor_2.11-2.3.9.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.5.jar:na] at
Re: [akka-user] Dead letter JournalProtocol#WriteMessageSuccessful
Thank you very much, that explain my issue! I am still not sure if WriteMessageSuccessful can arrive after the actor is stopped. I can find no reference to it in the docs... I will find out after running code with PoisonPill instead of stop(self()), I guess! Thank you! Anders mandag 2. mars 2015 15.54.36 UTC+1 skrev Björn Antonsson følgende: Hi Anders, How are you stopping your actor. There is a difference between system.stop(actorRef) and actorRef ! PoisonPill. Calling stop is more direct and will as you say send a message, but that is an internal system message that will cut the line and be processed before normal actor messages. If you on the other hand sends the actor a PoisonPill, it will process all messages that it has in the inbox up until the PoisonPill, and then shut down. Please note that there can of course be messages enqueued after the PoisonPill that won't be handled. B/ On 27 February 2015 at 13:26:30, Anders Båtstrand (ande...@gmail.com javascript:) wrote: Dear users Sometimes I find dead letter JournalProtocol#WriteMessageSuccessful in my logs, after I have stopped my actor. As I understand it, stop involves a message send, so all write operations should be finished before the actor is stopped. How is it still possible to get this message AFTER my actor is gone? Regards, Anders -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. http://typesafe.com – Reactive Apps on the JVM twitter: bantonsson http://twitter.com/bantonsson JOIN US. REGISTER TODAY! http://event.scaladays.org/scaladays-sanfran-2015 Scala http://event.scaladays.org/scaladays-sanfran-2015 Days http://event.scaladays.org/scaladays-sanfran-2015 March 16th-18th, http://event.scaladays.org/scaladays-sanfran-2015 San Francisco http://event.scaladays.org/scaladays-sanfran-2015 -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Problem with supervision strategy akka-stream 1.0-M4
I suspect it is this issue https://github.com/akka/akka/issues/16979 The root cause is just not printed out : Cause: java.lang.IllegalStateException: Processor actor terminated abruptly -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Best Workaround for: java.lang.ClassCastException: akka.actor.UnstartedCell cannot be cast to akka.actor.ActorCell
Hi Steve, I guess that wrapping it is one option. Another option would be to open a ticket and backport/cherry-pick the fixes to 2.3.x for inclusion in future minor releases. The changes look binary compatible. B/ On 2 March 2015 at 02:02:29, Steve Ramage (s...@sjrx.net) wrote: Like in this bug report: https://github.com/akka/akka/issues/15409 I am getting that exception sporadically when running unit tests. It looks like it was fixed, but only for the next major release and not 2.3.x . I was wondering what I should do in the interim. Should I just wrap every Inbox.create() with a helper method that catches that exception and keeps retrying until it succeeds? Steve Ramage -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka IO, transferring Java object bytes to an actor
Hi, To be able to deserialize the objects you need to use Java Serialization, and the bytestrings contain the data in the order that you wrote it. One bytestring might contain both objects, or in the case of large objects, only part of an object. Writing the object output stream straight into the remote server is not a good idea, and you would probably be better off with a protocol that has some proper framing that you can easily interpret without doing the deserialization. Personally I would use akka-http or spray and expose the cache as a REST service. Then you could post your key value pairs in a form, and easily pick out the parts on the other end. B/ On 2 March 2015 at 14:05:15, Ömer Faruk Gül (omergul...@gmail.com) wrote: Hi, I'm trying to transfer serialized java objects to an Actor. What I want to do is to transfer a java serialised object with a key. (Think of it is like a key value cache) The server is written with Akka Actors and the client is Java. On the actor side I get the ByteString objects, and when I immediately send them to Java client I can read them. There is no problem with this. The problem is, I want to know which keys and objects are received on the actor level. But I only get ByteString, and I don't know which of them is the key or the object. Is there kind of a delimiter which I should check, like now I'm receiving the key, and now I receive the related object? The example java code I'm using: InetAddress address = InetAddress.getByName(127.0.0.1); Socket socket = new Socket(address, 7000); ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(key1); outputStream.writeObject(user); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); String str2 = (String)inputStream.readObject(); System.out.println(Received string2 object: +str2); User user2 = (User)inputStream.readObject(); System.out.println(Received user2 object: +user2.fullName()); And my data handler is the example given on the Akka.io socket introduction: class SimplisticHandler extends Actor with ActorLogging { override def preStart() = { log.info(SimplisticHandler started!) } def receive = { case Received(data) = log.info(Received data: +data) sender() ! Write(data) case PeerClosed = context.stop(self) } override def postStop() = { log.info(SimplisticHandler stopped!) } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe Inc. – Reactive Apps on the JVM twitter: bantonsson JOIN US. REGISTER TODAY! Scala Days March 16th-18th, San Francisco -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: goto different state in a `PersistentView with FSM` (after recover) without triggering onTransition
I think I found a solution. *isPersistent is false during receiving SnapshotOffer, but true during receiving other persisted events from journal.* So a solution is to add if isPersistent beside every onTransition case that I don't want to get executed on the initial recovery from snapshots. I pushed my experiments here https://github.com/tabdulradi/akka-persistence-fsm/blob/master/src/main/scala/sample/persistence/ViewExample.scala if any one is intersted On Monday, March 2, 2015 at 12:01:06 PM UTC, Tamer Abdul-Radi wrote: I have this use case: class MyPersistentViewFSM extends PersistentView with LoggingFSM[State, Data] { startWith(A, ???) when (A) { case Event(SnapshotOffer(_, _), _) = val restoredState = ??? // Let's say we somehow figured out the last state was C val restoredData = ??? goto(restoredState) with restoredData case Event(_, _) = goto(B) } when (B) { case Event(_, _) = goto(C) } when (C) { case _ = saveSnapshot(???) } } This will trigger a state transition from `A - C`, which is logically wrong because we have to pass through B first. So I am thinking if there is a way to silently set the state to be C, without triggering onTransition would solve the case. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams 1.0-m4 load balancer creation stuck
Hi Luis, On Tue, Mar 3, 2015 at 3:08 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: Thank you Endre! Yes, I have seen that example. In my use case, workers are also partial graphs That does not matter. All Flows are partial graphs with exactly the shape that you use Graph[FlowShape[A, B], Unit]. Flows are just a convenience API for that particular shape. If you use anything that resembles a Flow, then you are better off wrapping it in a Flow. You can even convert a Graph[FlowShape[A, B], Unit] to a Flow: val myFlow = Flow() { implicit b = val g = b.add(myGraph) // myGraph: Graph[FlowShape[A, B], Unit] (g.inlet, g.outlet) } -Endre and I will use builder.add to import the flow as many times as I need and let you know if that did the trick. If I found any other problem, I will simplify my use case so I can just use Flows. 2015-03-03 14:03 GMT+00:00 Endre Varga endre.va...@typesafe.com: Btw, have you looked at the actual cookbook sample? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers On Tue, Mar 3, 2015 at 2:56 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Luis, It should not stuck but throw, but this will not work: broadcast.out(i) ~ worker ~ merge.in(i) You imported worker once, you cannot use it N times. You can either use builder.add to add as many times as you need (the parametric import you used only matters if you want to expose the materialized value of the imported element), but since your worker is a Flow, why don't just accept it as a Flow instead of Graph[FlowShape[A, B], Unit]? It is interesting why it hangs though instead of throwing an exception. -Endre With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-streams 1.0-m4 load balancer creation stuck
Ok, then I will use the Flow API to simplify my code :) Thanks! 2015-03-03 14:31 GMT+00:00 Endre Varga endre.va...@typesafe.com: Hi Luis, On Tue, Mar 3, 2015 at 3:08 PM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: Thank you Endre! Yes, I have seen that example. In my use case, workers are also partial graphs That does not matter. All Flows are partial graphs with exactly the shape that you use Graph[FlowShape[A, B], Unit]. Flows are just a convenience API for that particular shape. If you use anything that resembles a Flow, then you are better off wrapping it in a Flow. You can even convert a Graph[FlowShape[A, B], Unit] to a Flow: val myFlow = Flow() { implicit b = val g = b.add(myGraph) // myGraph: Graph[FlowShape[A, B], Unit] (g.inlet, g.outlet) } -Endre and I will use builder.add to import the flow as many times as I need and let you know if that did the trick. If I found any other problem, I will simplify my use case so I can just use Flows. 2015-03-03 14:03 GMT+00:00 Endre Varga endre.va...@typesafe.com: Btw, have you looked at the actual cookbook sample? http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M4/scala/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers On Tue, Mar 3, 2015 at 2:56 PM, Endre Varga endre.va...@typesafe.com wrote: Hi Luis, It should not stuck but throw, but this will not work: broadcast.out(i) ~ worker ~ merge.in(i) You imported worker once, you cannot use it N times. You can either use builder.add to add as many times as you need (the parametric import you used only matters if you want to expose the materialized value of the imported element), but since your worker is a Flow, why don't just accept it as a Flow instead of Graph[FlowShape[A, B], Unit]? It is interesting why it hangs though instead of throwing an exception. -Endre With nrOfWorkers 1, the creation of this partial graph get stuck after the first iteration. Am I missing something? I was expecting this to create as many workers as desired. Regards, Luis -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to