Reposted here:
https://discuss.lightbend.com/t/aggregating-a-million-records-using-akka-streams/4781
On Thursday, August 8, 2019 at 11:02:13 AM UTC+2, Aditya pavan Kumar wrote:
>
> I am running a simulation which generates a million records every second.
> I’m writing them to Kafka and reading th
I see. Thanks.
With the provided code I couldn't reproduce the issue at least in the quick
tests I did. Could you run
jmap -histo:live on the command line when some memory has accrued and
send the output here (or in private)?
Johannes
On Thu, Nov 22, 2018 at 1:16 PM Sean Gibbons
wrote:
> An
Hi Sean,
thanks for the comprehensive report. What do you mean with a native vs
non-native TLS server? Is the example app for the "native TLS" server?
Johannes
On Wednesday, November 21, 2018 at 6:26:09 PM UTC+1, Sean Gibbons wrote:
>
> Hi all,
>
> I have been working with a native TLS Akka HTT
That the entity directive is part of the picture could be a hint that
indeed streaming requests might be the cause of this. In spray, there was
no request streaming enabled by default and the engine just collected the
complete stream into a buffer and dispatched it to the app only after
everything
list of closed issues can be found on the 10.1.0-RC1
<https://github.com/akka/akka-http/milestone/26?closed=1>, 10.1.0-RC2
<https://github.com/akka/akka-http/milestone/34?closed=1>, and 10.1.0
<https://github.com/akka/akka-http/milestone/35?closed=1>milestones on
GitHub.
For
closed issues can be found on the 10.1.0-RC2 milestone
<https://github.com/akka/akka-http/milestone/34?closed=1> milestones on
GitHub.
For this release we had the help of 23 contributors – thank you all very
much!
commits added removed
35 507 215 Johannes Rudolph
13
Hi,
in akka-stream, processing is usually run in a fused fashion, i.e. without
further configuration one stream will run in a single actor so all
operations are run sequentially. In such a synchronous scenario, there's
little room for elements to ever get dropped because the actorRef stage
bas
uch!
commits added removed
4448414860 Arnout Engelen
321196 613 Johannes Rudolph
22 7202789 Josep Prat
3 122 349 Jonas Fonseca
2 28 17 Pavel Boldyrev
1 6 4 Johan Andrén
1 128 4 Martynas Mickevičius
1 111
I wonder if you could start a timer when you enter the trace block and then
e.g. after 200ms trigger one or multiple stack dumps (using JMX or just by
printing out the result of `Thread.getAllStackTraces`). It's not super
likely that something will turn up but it seems like a simple enough thing
Hi Gary,
did you find out what's going on by now? If I understand correctly, you get
latency spikes as soon as you use the `entity[as[String]]` directive? Could
you narrow down if there's anything special to those requests? I guess you
monitor your GC times?
Johannes
On Wednesday, November 1,
Hi Evgeny,
you discovered one of the reasons for the magnet pattern. If you use
`requireParam("param".as[Int]) { abc => ... }` then the `{ abc => }` block
is mistaken as the implicit argument of `requireParam`. So, either you are
ok with that and require users to use extra parentheses
(`(requir
I tried your code and it doesn't OOM for me. Have you tried it outside of a
test suite? It might be that the test infrastructure is collecting all the
data when you use something as `reponse.entity`. If that doesn't
help, try capturing a heap dump on OOM and see where the memory is spent.
Joh
I missed this post before.
I'd like to add another point. Akka Http hasn't been performance tested on
a 40 core machine. The high idle CPU percecntage means that either Akka /
Akka Http is not configured correctly for this amount of cores or that
there are actual contention issues at these lev
Cool, thanks for sharing, nice stuff :) We know, btw., that some pieces of
the architecture are not completely optimal but are consequences of the
history of the projects. E.g. it could make sense to write a streams-only
implementation of the TCP layer instead of putting it on top of the actor
Correct, it will limit parallelism. I usually see the streams
infrastructure more as a control channel that makes sure that data flows
correctly. These kind of control things shouldn't require much overall CPU
share so it should not matter so much. If you want to do CPU-intensive work
you need
Hi Simon,
as Johan said, you shouldn't use `get` to wait for the result of future.
This just synchronously blocks the thread from doing any other useful work.
Instead, you can asynchronously handle the result of the future once it is
available. Because it is so common, we have a pattern for thi
Hi Unmesh,
On Wednesday, September 27, 2017 at 3:01:24 PM UTC+2, Unmesh Joshi wrote:
>
> I was trying to go through the code to understand how GraphStages receive
> actor messages. I see that GraphStageActor is a actor not created like
> normal actors. I looks like all the messages to GraphStage
Hi Kilic,
Try looking at stack traces during the busy periods (e.g. use `jstack` on
the command line to gather some), that should give you a clue what's going
on. In the picture you sent in your first email there were actually only 8
regular pool threads. Are there times where more is going on
ied that but there are 2 places where declare the
> materializers. Both are declared as vals. I will verify the number of
> materializer instances on my heap-dump to confirm.
>
> On Tue, 26 Sep 2017 at 13:24 Johannes Rudolph com> wrote:
>
>> On Tue, Sep 26, 2017 at 7:18 A
Oops, one should read the whole question before answering... Just saw that
you already tried that. Unfortunately, it seems that this is indeed a
shortcoming of the current model.
I guess with a bit of fiddling you could try making all of those
marshallers marshal to `Future[HttpResponse]` inste
Hi Mantis,
you are right, `Marshaller.withFixedContentType` is a bit restricted in
that regard. Fortunately, it is only a shortcut for simpler cases and the
full asynchronous functionality is available for marshallers. Try something
like
Marshaller[Iterator[Data], HttpResponse] { implicit ec =
On Tue, Sep 26, 2017 at 7:18 AM, Patrik Nordwall
wrote:
> If the names are StreamSupervisor- I think it can be that a new
> Materializer is created for each request. I don’t know if that is done by
> your application or by Akka Http. Does that ring any bells? Do you have any
> creation of stream
Hi Eduardo,
cluster sharding has at-most-once delivery (as most of Akka) so losing some
messages is to be expected. Persistent actor can opt-in to at-least-once
delivery (see
http://doc.akka.io/docs/akka/current/scala/persistence.html#at-least-once-delivery),
for other actors, you need to mak
Hi Bartosz,
I can look into the heap dump. You can send it to me privately. If that's
not possible could you post an histogram? It would be great if that could
be filtered once for subclasses of `Actor` (which will probably be
dominated by `ActorGraphInterpreter`) and once filtered by `GraphSta
Hi Bwmat,
On Saturday, September 9, 2017 at 2:32:13 AM UTC+2, Bwmat wrote:
>
> The type is just Object, and it's not documented in the linked javadoc.
>
It is undefined. It needs to have this type so that users can pass in
sources with any materialized value. The Akka Http implementation will
m
Hi Dominic,
it depends on what you mean with "high-latency" API. If you mean that some
external service is called which takes a long while, then you need to
ensure that executing this external call does not block the thread and the
thread can be used for other tasks while waiting for the result
Hi Jerry,
On Tue, Aug 22, 2017 at 12:20 PM, Jerry Tworek
wrote:
> Do I understand it correctly, that in this case cachedHostConnectionPool
> is basically unusable? I assume it will always be executed in a separate
> stage from the next stage, that actually consumes the request, and it can
> alwa
Hi Jerry,
your explanation is spot-on. You need to be make sure that the entities of
all responses are consumed. In your case, that may not happen because of a
race-condition: `take(2)` will cancel the stream and responses might get
discarded between the first and the second `mapAsyncUnordered`
Hi Christophe,
yes, that's correct. There seems to be no way to model custom cookie
attributes right now. Using RawHeader is the right workaround for now. I
filed https://github.com/akka/akka-http/issues/1354 to discuss improvements.
Johannes
On Monday, August 14, 2017 at 10:31:11 AM UTC+2, Ch
Hi Jeff,
if you don't read the response bodies of all the responses, your pipeline
will stall because the super pool connection are still waiting for your
code to actually read the responses. In your example, try to add
`x.discardEntityBytes` (or actually read the entity) inside of the
`Sink.f
Hi Yannick,
if you want to log the complete request contents, then there is no other
way than to collect anything into memory (actually, that's a consequence of
logging, not of the API).
In that case, you can use toStrict method or the toStrictEntity directive
at the root of your routing tree
Dear fast application restarters,
we just released sbt-revolver 0.9.0 which is the first version of
sbt-revolver cross-built for sbt 0.13.x and 1.0.x. Thanks go to Olli
Helenius / @liff who contributed the sbt 1.0 compatibility changes
(#62).
We also merged a long-standing PR that allows to custo
Dear fast application restarters,
we just released sbt-revolver 0.9.0 which is the first version of
sbt-revolver cross-built for sbt 0.13.x and 1.0.x. Thanks go to Olli
Helenius / @liff who contributed the sbt 1.0 compatibility changes
(#62).
We also merged a long-standing PR that allows to custo
Hi Vanger,
thanks for the report. Have you changed the value of
akka.http.host-connection-pool.min-connections
(https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/resources/reference.conf#L233)?
That would explain the behavior where the pool tries to keep connections
alive
Hi Diego,
it seems it is an oversight that the dispatcher for at least the actor
parts of the pool infrastructure cannot be configured anywhere. I created a
ticket to track adding this feature:
https://github.com/akka/akka-http/issues/1278
Thanks,
Johannes
On Thursday, June 22, 2017 at 12:4
Hi Shayan,
this seems like an uncommon usage for an HTTP client. Basically you want to
connect to a server that presents a certificate for the wrong host name.
This is unsupported out of the box because it would be an unsafe thing to
do in general.
The way you tried it does not work because th
On Thursday, July 13, 2017 at 2:56:52 PM UTC+2, Justin du coeur wrote:
>
> (I should note: I don't use Akka Pub/Sub myself, but I'm wondering whether
> Cluster Sharding actually fits your use case well. Depending on the
> details, it might.)
>
Yep, I guess that's true. With cluster sharding eac
On Thursday, July 13, 2017 at 1:08:19 PM UTC+2, Alexander Lukyanchikov
wrote:
>
> *The only question, is it capable to manage tens of millions of topics?
> Would it perform better then our current solution?*
>
No, most likely it currently won't scale up to 1 million active topics. In
Akka's pub
On Wednesday, July 12, 2017 at 9:08:52 PM UTC+2, Jeff wrote:
>
> As for the issue of complexity, it's actually not as complex as it sounds.
> I'm using Http().superPool() to make api requests and I wanted to avoid
> having to create a separate stream for every single iteration of api
> request w
Hi Jeff,
your API seems quite complex. I don't know the purpose of it so I cannot
suggest anything but I'd try to simplify. :)
That said, your problem seems to be that you cannot write a concrete type
that would express the dependency between the two components of the tuple
`(RequestBuilder, P
Hi Michael,
On Monday, July 10, 2017 at 9:01:00 AM UTC+2, Michael Pisula wrote:
>
> As far as I saw from the source code, it could point to a problem with
> header parsing, but I am not exactly sure what could cause the problem.
>
The place in the code is actually misleading, as it the error is
ion 2,3,4 *, din't tell me the reason ( the stack ), id dont
> hnow how solve this problem.
>
> 在 2017年6月7日星期三 UTC+8下午5:00:07,Johannes Rudolph写道:
>>
>>
>> The rest of the stack here will tell you where the problem comes from:
>>
>> On Wed, Jun 7, 2017 at 10:40
The rest of the stack here will tell you where the problem comes from:
On Wed, Jun 7, 2017 at 10:40 AM, wrote:
> * situation 1 : ( has tell me the stack, and tell me *Shutdown finished *
> )*
> INFO | jvm 1| 2017/04/07 15:49:52 | java.lang.NoClassDefFoundError:
> Lws/protos/EnumsProtos$Har
Hi,
my colleague Arnout just found out that the error will only be logged to
stderr if you enable the `akka.jvm-exit-on-fatal-error` setting. Can you
try enabling this setting and then run again?
I also filed an issue to improve the logging of fatal
errors: https://github.com/akka/akka/issue
Hi,
the hostname setting is a bit misnamed. It defines the interface the server
binds to.
So, you can put "0.0.0.0" in there to make sure the management interface is
bound on all interfaces (but make sure not to expose it publicly) or put
some other interface address in there.
I filed https:/
Hi,
in cases of fatal errors, the error and stack trace is logged to stderr
(not using the logging framework). Note that in some cases, the logging
itself may fail (that's why the error is fatal: after it happens, the state
of the JVM might be corrupted and operations like logging may fail for
Hi Ivan,
I guess it depends on how you want to use this connection exactly. As TCP
is a stream-based protocol there's usually some state associated with a
connection that needs to be regarded when a new connection is opened.
Therefore, there cannot be a general solution to the problem that woul
Hi,
try this case class structure instead:
case class Customer(name: String,jsonData: JsValue)
Johannes
On Saturday, May 13, 2017 at 10:47:52 AM UTC+2, vishal...@exadatum.com
wrote:
>
>
> I am trying to create the REST service using akka Http and slick . I am
> trying to persist the Raw j
Hi Andrew,
here's a general idea at how it could work:
If you model each authentication method as a `Directive1[Session]` that
returns the session (or user, principal, etc.) for that authentication
method and all of the directive return the a value of the same type or a
type with a common supe
Hi Andrew,
your observation is correct. Server side TLS configuration is only possible
through code right now. We have tickets to track improving documentation
and maybe adding the configuration based approach
https://github.com/akka/akka-http/issues/55
https://github.com/akka/akka-http/issues/
Hi Thibault,
you are right, there's currently no built-in way to do this. To achieve it,
you could e.g. copy the Jackson marshaller from the sources to use a custom
media type. See
here:
https://github.com/akka/akka-http/blob/5932237a86a432d623fafb1e84eeeff56d7485fe/akka-http-marshallers-java/
Hi Florian,
can you clarify what needs improvement? Is that about client or server side?
Johannes
On Saturday, May 13, 2017 at 10:47:52 AM UTC+2, Florian Rosenberg wrote:
>
> I'm seeing a similar problem, but not wit the URI but with the
> Strict-Transport-Header, it seems to be invalid, changi
Hi Manuel,
how did you determine that the problem is related to Akka? What did the
profiling say and where's the bottleneck now?
Answering your questions:
On Friday, April 28, 2017 at 12:10:50 AM UTC+2, Manuel wrote:
I guess that non-blocking code using Futures and work stealing by the
> fork
Hi Richard,
the behavior you describe is as expected. The ask ActorRef has no relation
to the connection and so watching it doesn't have the desired effect.
The routing DSL doesn't even know about the concept of connections. It only
handles single requests and doesn't get notified in any way wh
Thanks Julian for sharing the example. Indeed, using the BroadcastHub is
the recommended way to implement something like this.
On Saturday, April 22, 2017 at 1:09:19 AM UTC+2, Julian Howarth wrote:
>
> I may have misunderstood what you want to achieve, but you don't have to
> use actors if you'd
Hi Thibault,
if you have the body of the response already as a Source,
you can create a response with a chunked entity from it like this:
HttpResponse.create()
.withEntity(HttpEntities.create(contentType, source))
Johannes
On Thursday, April 20, 2017 at 9:56:47 AM UTC+2, Thibault Meyer wrote
Hi Gavin,
the current version of Play is not compatible with Akka 2.5.0. This has
already been fixed for the upcoming Play 2.6 series. I created a ticket to
investigate if the compatibility fix should be backported to Play 2.5.x as
well: https://github.com/playframework/playframework/issues/725
I think that's a valid analogy. A while ago we were investigating similar
things and also arrived at that analogy. In practice, it turns out that
fused streams, i.e. multiple stream components that run in one actor will
skew measurements.
Not sure how much sense these analogies make when going
I think there are a few things that should be treated separately:
* How to organize bigger route structures?
* How to implement some parts of the route structure as actors?
As Roland wrote before, a Route is just an alias for a function
`RequestContext => Future[RouteResult]`. This gives you u
Hi,
that sounds as if you haven't configured Akka HTTP to open an HTTPS server.
It's hard to say though, as you didn't post any code ;) What code did you
use to configure Akka HTTP to use HTTPS?
Johannes
On Tuesday, April 18, 2017 at 2:40:08 PM UTC+2, Abdeali Chandanwala wrote:
>
> Hi Arnout
Hi Alan,
On Friday, March 17, 2017 at 12:25:09 PM UTC+1, Alan Burlison wrote:
>
> pathPrefix("root") {
>concat(
> pathPrefix("service1") { service1.route },
> pathPrefix("service2") { service2.route }
>)
> }
>
> That works fine with a path of say "/root/service1", but
Hi Alan,
yes, that's a valid use case. The route tree is usually traversed
completely until a Route matches. But if you know that the search can be
cut short, then `seal` is definitely a good solution. Another solution
would be to use only `handleRejections` which would have the advantage that
Hi Brice,
we previously recommended the stream variant
(`Source.single(request).mapAsync(1))`) but it turned out that this is not
a good idea as materialization has some cost and the API is more complex
than using `Http.singleRequest()` directly. So, using
`Http.singleRequest()` is the right w
Hi Brice,
we changed the default behavior in akka-http. Now, all directives (i.e.
also `Directive0`) behave as `dynamic` did in spray before.
Johannes
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/addition
Hi Dimitry,
that message is a different one. It means that your code is issuing
requests with an explicit header `RawHeader("User-Agent", ...)` somewhere.
Johannes
On Thursday, February 23, 2017 at 8:10:03 PM UTC+1,
dmitriy...@alisagaming.com wrote:
>
> Hi everyone.
>
> I've been using akka wi
Hello,
the error is thrown
here:
https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala#L303.
I agree it's hard to see why that error would happen. One thing that could
have happened is that a connection attempt was very slow and so the
idle-t
Hi Johnson,
if you use a `cachedHostConnectionPool`, the pool itself is a shared
resource that is managed by the infrastructure. If you don't do anything
else, the pool will shutdown itself after
the akka.http.host-connection-pool.idle-timeout has passed. If you want
manual control, the `HostC
Hi,
XMPP is an application-level protocol while TCP is a transport-level
protocol. In fact, XMPP usually runs *on top of* TCP. So, yes, you can use
Akka TCP to implement XMPP.
Comparing XMPP to what you probably have right now is that in your current
application you used your own custom protoc
This SO questions lists a few alternatives about how to achieve that with a
reasonably new JVM:
http://stackoverflow.com/questions/12096403/java-shutting-down-on-out-of-memory-error
On Wednesday, February 22, 2017 at 11:27:21 AM UTC+1, Hippolyte Léger wrote:
>
> Hello,
> so I have a fairly large
Hi,
injecting frames manually is not supported. Not supported means that we
don't offer an API that would allow users to do that.
That said, there's an internal API that allows to specify a frame handler
directly:
* You can case the `UpgradeToWebsocket` header to
`UpgradeToWebsocketLowLevel`
Hi Elliot,
the reason is not the type parameter directly, but the `: ClassTag` context
bound which introduces an implicit parameter list to your method. So, after
unfolding this syntactic sugar your withActor method basically looks like
this:
def withActor[A <: Actor](message: Any)(implicit ev
Hi,
you are applying the `alsoTo` clause to the wrong side of the connection.
You need to put it on the Sink-side. The way you have written it, the
`alsoTo` clause waits for your `Source.actorRef` to close the connection.
Try using
singleWebSocketRequest(...,
Flow[Message].alsoTo(Sink.onCompl
Hi,
these internal tests basically use it:
https://github.com/akka/akka-http/blob/master/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala
See
https://github.com/akka/akka-http/blob/master/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSet
Hi Elmar,
could you first update to the latest version of akka-http, 10.0.3? We fixed
a few issues with the connection pool since akka 2.4.7.
Johannes
On Wednesday, February 15, 2017 at 5:12:18 PM UTC+1, Elmar Weber wrote:
>
> Hello,
>
> we are observing an issue with akka-http client side sinc
Hi Gang,
disassociation usually means some kind of connection error. Unfortunately,
akka currently doesn't say anything about why a connection was closed. This
will be improved in the next version.
See https://github.com/akka/akka/pull/22278
Johannes
On Wednesday, February 8, 2017 at 12:04:19
Hi David,
the 503 is generated by the timeout logic, see the
`akka.http.server.request-timeout` setting. See the timeout directives for
ways to change it based on the
request:
http://doc.akka.io/docs/akka-http/10.0.3/scala/http/routing-dsl/directives/timeout-directives/index.html
Regarding th
Yes, this seems to be a bug in sbt-assembly which gets triggered if you use
shading. See https://github.com/sbt/sbt-assembly/issues/205
>
>
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>
Hi Eric,
we'd like to look into that. It looks as if a streams materializer is
holding on to some memory but we need more info to see what it keeps
exactly to.
Is this a reproducible scenario? Could you share the memory dump (in
private) with us? Otherwise, could you send the list of top consu
an be found on the 10.0.3 milestone
<https://github.com/akka/akka-http/milestone/19> on github.
commits added removed
26 1607 175 Konrad `ktoso` Malawski
24 1445 1029 Johannes Rudolph
18673 209 Jonas Fonseca
13 2812 821 Josep Prat
5
On Monday, January 2, 2017 at 1:42:27 PM UTC+1, Alan Burlison wrote:
>
> One question:
>
> headerValueByName("X-Auth-Key")).flatMap { (user, pass) =>
> // <- user flatMap for custom directives
>
> doesn't compile as the resulting value of type Directive[(String,
> String)] doesn't have a fl
Hi Alan,
here are a few steps to get there:
* provide all your different authentication directives as values of type
`Directive1[T]` with T being the type of the principal found after
successful authentication.
* To build those, use either one of the existing authentication directives
starti
Hi Gaurav,
two small remarks:
* Try using `Http().singleRequest` instead of
`Souce.single().via(poolClientFlow).run`. Materialization (= `run` /
`runWith`) has a certain cost that can be avoided using
`Http().singleRequest`.
* Using `(1 to 1000).par.foreach` to run the benchmark may lead to
d issues can be found on the 10.0.1 milestones on github.
For this release we had the help of 21 committers – thank you all very much!
Credits:
commits added removed
14 429 310 Johannes Rudolph
7207 16 Konrad `ktoso` Malawski
6238 109 Josep Prat
Hi Julian,
can you post more information about your code and what happens exactly when
you run it? What kind of key material do you have and how do you load it?
Are you using akka-http on the client side or on the server side?
Johannes
On Thursday, November 10, 2016 at 9:50:08 AM UTC+1, Julian
Using mapConcat instead may even be faster ;)
On Wednesday, October 19, 2016 at 5:05:22 PM UTC+2,
vladysla...@rtsmunity.com wrote:
>
> Okay, so the issue was really in Framing performance. Changing Framing
> stage for
>
> flatMapConcat(chunk -> Source.from(Arrays.asList(chunk.split("\n".
>
>
Hi Vladyslav,
this sounds like a worst-case scenario for the Framing stages: 45M lines
and each line 2-3 characters long will put a lot of pressure on the streams
infrastructure and the framing stage. It might still make sense to
benchmark and optimize the Framing stage. One optimization could
Hi Victor,
good point. I think the Scaladoc is wrong there. Could you raise an issue
at akka/akka?
Johannes
On Tuesday, October 18, 2016 at 2:28:14 PM UTC+2, Victor wrote:
>
> Hi,
>
> It's written in the ScalaDoc of the *MergeHub.source* method that:
>
> If one of the inputs fails the Sink, the
Thanks Rafał for these explanations. Just a small correction:
On Tuesday, October 11, 2016 at 3:58:09 PM UTC+2, Rafał Krzewski wrote:
>
> An alternative solution would be looking up websocket buffering settings
> and jacking it up enough to receive all messages as Strict :)
>
Unfortunately, no,
Great, thanks Richard for tackling this and André for the explanations!
On Saturday, October 15, 2016 at 10:05:59 PM UTC+2, Richard Imaoka wrote:
>
> Sorry I had read the full discussion but I think I didn't correctly
> understand what is allowed in Akka, and what is RFC 3986 compliant.
> Now it'
Hi Ronny,
I agree that error message is little helpful. Maybe you are sending plain
Strings to the actor created by Source.actorRef? (Still we might want to
improve the error message in that case.)
Johannes
On Monday, September 19, 2016 at 12:08:52 AM UTC+2, Ronny Bräunlich wrote:
>
> Hi every
Klymko wrote:
>
> Basically I'd like to prepend something, but at the moment first
> ByteString received, I mean that prepended value should not be in the
> stream in case of connection failure.
>
> On Tuesday, September 20, 2016 at 1:44:56 PM UTC+3, Johannes Rudolph w
Hi Victor,
isn't that the same as the identity flow
`Flow.apply[T]` (or just `Flow[T]`)
?
But maybe I am missing something?
Johannes
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>
Hi Simon,
you could also try to split up the asynchronous computation and the actual
filtering like this:
def filterFunc(e: E): Future[Result]
xyz.mapAsync(n)(e => filterFunc(e).map(res => e -> res))
.via(statefulFilterGraphStage)
And then implement the `statefulFilterGraphStage` with whate
Hi Yaroslav,
no worries, this is a good question. It depends a bit on what kind of logic
you want to trigger. If you just want to do something, you can use
`mapMaterializedValue` to do something with the `OutgoingConnection` object
before materialization is complete.
If you want to prepend som
Hi Paul,
a combinator to achieve something like this has been proposed several times
but I think there hasn't been consensus how to implement it exactly. The
latest approach is discussed here:
https://github.com/akka/akka-stream-contrib/issues/50
Johannes
On Thursday, September 15, 2016 at 8:
Hi Kunai,
On Thursday, September 15, 2016 at 7:49:18 AM UTC+2, Kunal Deshpande wrote:
>
> Few questions on back pressure
> 1. While using Flows in akka-streams using .via will a downstream flow
> apply back pressure to a flow upstream or is back pressure only signaled to
> a Source?
>
Backpress
Hi,
On Tuesday, September 13, 2016 at 4:26:55 PM UTC+2, rrodseth wrote:
> Is this the right way to debug the missing conversion?
> val prm = implicitly[Future[PimpedResult[(StatusCode,
Result[StatusDTO])]] => ToResponseMarshallable]
Try
implicitly[ToResponseMarshaller[Future[PimpedResult[(St
Hi Henry,
couldn't you just use ActorSelection or `actorFor`? You could then start
with the most specific path and strip segments from the end until you find
an already existing actor to handle the rest of the URI.
Johannes
On Monday, February 29, 2016 at 10:39:42 PM UTC+1, Henry Story wrote:
On Monday, February 1, 2016 at 11:28:58 AM UTC+1, Johannes Rudolph wrote:
>
> Http.bindAndHandle(currentRoute(), ...)
>
Actually, I just figured that this won't work, because currentRoute() will
only be evaluated once.
You will need a simple wrapper which is evaluated anew for
Hi Ubaldo,
yes, that's certainly possible. A `Route` is just a regular Java object, so
you can choose a new `Route` to do the processing for every new request.
You just need to make sure that you switch out the Route in a thread-safe
way.
E.g.
val routeHolder = new AtomicReference[Route](init
1 - 100 of 163 matches
Mail list logo