Re: [akka-user] Custom Sink:with CompletionStage - onPush() only get one element, why?

2017-05-01 Thread Guofeng Zhang
. To do this you must use the async > callback as described in this section of the docs: > http://doc.akka.io/docs/akka/2.5.0/java/stream/stream- > customize.html#Using_asynchronous_side-channels > > -- > Johan > Akka Team > > On Tue, Apr 25, 2017 at 7:51 PM, Guofeng Zhang

[akka-user] Custom Sink:with CompletionStage - onPush() only get one element, why?

2017-04-25 Thread Guofeng Zhang
Hi, I create a custom sink following the sample in Custom stream processing , When the onPush() implemented like the sample: public void onPush() throws Exception { Integer element = grab(in);

[akka-user] Stream stopped silently, using mapAsyncUnordered()

2017-03-17 Thread Guofeng Zhang
Hi, Here is my code: Source missedProductSource = Source.from(missedProducts) ; final RunnableGraph missedTo = missedProductSource.mapAsyncUnordered(5, p->{ try { return stopProductDisplay(p.productNo, p.id); } catch

Re: [akka-user] Re: How to stream java 8's CompletionStage

2016-12-28 Thread Guofeng Zhang
batching/windowing operators in your stream if the default mapAsync > behavior does not match what you need. > > Tal > > > On Sunday, December 25, 2016 at 11:04:25 AM UTC+2, Guofeng Zhang wrote: >> >> Hi, >> >> I have a akka stream. In the middle of the stream

[akka-user] How to stream java 8's CompletionStage

2016-12-25 Thread Guofeng Zhang
Hi, I have a akka stream. In the middle of the stream the map() calls a remote service (WSClient),which returns a completionStage. I want the source does not emit the next element before the completionStage finished. To be more efficient, I want the source emit batch of elements, then wait for

[akka-user] How could I know if a streams completed from the outside

2016-09-26 Thread Guofeng Zhang
Hi, I have defined a publisher actor and subscriber actor. How could I know if the stream has been completed from the outside (where I create the stream)? Thanks for your help. Guofeng -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >>

[akka-user] scheduleOnce() not fire message?

2016-08-19 Thread Guofeng Zhang
Hi, I use the following statements in preStart() to fire a message at a specific time: Logger.info("delaying " + d ) ; context().system().scheduler().scheduleOnce(d, self(), Tick.tick, getContext().dispatcher(), this.self()); In my test case, it works well. But when run in Play 2.5.4, I see

[akka-user] How materialized value is defined?

2016-08-16 Thread Guofeng Zhang
Hi, The methods like toMat() can expose a value when materialize a stream. How could a materialized value defined in a processor or flow? A example or link to the akka-stream source will be very helpful. Thanks, Guofeng -- >> Read the docs: http://akka.io/docs/ >>

[akka-user] How to deal with "connection pool shut down unexpectedly" ?

2016-08-03 Thread Guofeng Zhang
Hi, I want to capture this exception in my application, but It cannot be capturued using CompletionStage.exceptionally(). How could I do capture it? The follwing is the code: final HttpRequest request = HttpRequest.create(uri).addHeader(range) ; final CompletionStage

Re: [akka-user] Re: Proper way to shut down an outgoing http request stream?

2016-07-28 Thread Guofeng Zhang
Trying consume the response entities before shut down the pool: entity.dataBytes.runWith(Sink.ignore).onComplete({data=> Http().shutdownAllConnectionPools().onComplete { _ ⇒ System.out.println("connection pool closed") } }) On Fri, Jul 29, 2016 at

[akka-user] Re: Cannot figure out why ActorPublisher stopped?

2016-07-27 Thread Guofeng Zhang
The attached is the log. On Wed, Jul 27, 2016 at 9:50 AM, Guofeng Zhang <guofen...@gmail.com> wrote: > Hi, > > I tried a sample to use range request in an ActorPublisher to download a > large file. But only one range is downloaded. The publisher is stopped when >

[akka-user] How to know the sequence number of a persisted message?

2016-07-16 Thread Guofeng Zhang
Hi To delete messages, I know to know toSequenceNr. But how could I know the message's sequence number when I persisting a message? Thanks for your help. Guofeng -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >>

Re: [akka-user] Better way to use CompletableFuture in an Actor

2016-04-26 Thread Guofeng Zhang
for your help. guofeng On Mon, Apr 25, 2016 at 10:43 PM, Guofeng Zhang <guofen...@gmail.com> wrote: > You are right, I use java. > > Following your instruction, I have done it well. > > Thanks! > > Guofeng > > > On Sun, Apr 24, 2016 at 11:51 PM, Konrad Malaws

Re: [akka-user] Better way to use CompletableFuture in an Actor

2016-04-25 Thread Guofeng Zhang
gt; > Read about stash, pipe and become in the documentation (search box will > help :-)) > > http://doc.akka.io/docs/akka/2.4.4/java/untyped-actors.html#Ask__Send-And-Receive-Future > etc > > -- > Konrad `ktoso` Malawski > Akka <http://akka.io> @ Lightbend <ht

[akka-user] Is it possible to define a stream for set difference?

2016-04-24 Thread Guofeng Zhang
Hi, I have two sources, suppose source A is [A, B, C, X, Y, Z]. Source B is [A, B, D, W, Y, Z]. Is it possible to define a stream that produce [C, X]? Thanks. Guofeng -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >>

Re: [akka-user] How to use akka-stream to transform a csv file?

2016-04-16 Thread Guofeng Zhang
ll print: H, 2A, 3A, 4A Thanks again! Guofeng For the following sample: On Fri, Apr 15, 2016 at 1:35 PM, Richard Rodseth <rrods...@gmail.com> wrote: > Take a look at prefixAndTail > > > http://doc.akka.io/docs/akka/2.4.4/scala/stream/stages-overview.html#prefixAndTail

Re: [akka-user] Re: akka-stream: How to define a Processor in Java

2016-04-16 Thread Guofeng Zhang
gt; -- > Johan Andrén > Akka Team, Lightbend Inc. > > > > On Wednesday, April 13, 2016 at 8:58:45 AM UTC+2, Guofeng Zhang wrote: >> >> Hi, >> >> I am learning akka-stream, so I want to understand the low-level detail. >> I found the following post is

[akka-user] How to use akka-stream to transform a csv file?

2016-04-14 Thread Guofeng Zhang
Hi, I have a csv file like the following: ID,Product name,Price ,Sleeve,57.97 ,Jacket,83.44 I want to transform it to: ID,Product name,Price ,Sleeve,579.7 ,Jacket,834.4 That is, the head line (the first line) is not changed, but the prices in other lines are multiplied by