totally agree, the second solution would work perfectly. BUT then I dont need custom publisher.
I look at publishers as a way to integrate with legacy systems that push events and of course maybe I need to tune buffer limit a bit. I thought consumer will signal greater demand at some point, will have some logic to detect that it can consume more, or something. but yeah, my producer thread is filling up publisher's message box and consumer demand event is somewhere at the end. so most of messages are rejected. I thought that maybe demand event will have some sort of higher priority. On Friday, 7 August 2015 08:23:55 UTC+2, Patrik Nordwall wrote: > > You don't have any backpressure when sending the messages to the actor. > That is done outside of Akka streams. It will happily send all 100 messages > immediately (this is extremely fast) and fill up the buffer. > If you would use a iterator based Source to emit the 100 elements instead > of sending them via the ActorPublisher actor you would have backpressure > all the way. > > What is it that you are trying to build? > > On Thu, Aug 6, 2015 at 9:42 PM, paweł kamiński <kam...@gmail.com > <javascript:>> wrote: > >> thanks for your answer, >> >> I am more curious if this is my configuration in env where 3 threads >> compete for resources - and probably this causes the problem I ve described >> - or this is a general problem with notifying publisher about demand. >> in other words if I have fast consumer (and adding to list is quite fast) >> what is a throughput of publisher. I cannot imagine that I drop almost all >> message in real life scenario. >> >> I hope that above make any sense :) >> >> thanks for link for second question! I ve somehow overlooked it! >> >> On Wednesday, 5 August 2015 11:25:41 UTC+2, Patrik Nordwall wrote: >>> >>> >>> >>> On Sun, Aug 2, 2015 at 1:42 PM, paweł kamiński <kam...@gmail.com> wrote: >>> >>>> hi, >>>> I have simple actor producer based on >>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-integrations.html#ActorPublisher >>>> >>>> public class MessageProducer extends AbstractActorPublisher<Message> >>>> { >>>> private final static Logger logger = >>>> LoggerFactory.getLogger(MessageProducer.class); >>>> private final ArrayDeque<Message> buf; >>>> >>>> public MessageProducer(int maxBufferSize) >>>> { >>>> buf = new ArrayDeque<>(maxBufferSize); >>>> receive(ReceiveBuilder >>>> .match(Message.class, msg -> buf.size() == >>>> maxBufferSize, >>>> msg -> { >>>> if (logger.isTraceEnabled()) >>>> { >>>> logger.trace("Denying {}. buffer >>>> size is {}.", msg, buf.size()); >>>> } >>>> sender().tell(new Fail<>(msg.getMDC(), >>>> "denied"), self()); >>>> }) >>>> .match(Message.class, >>>> msg -> { >>>> buf.addLast(msg); >>>> drain(totalDemand()); >>>> }) >>>> .match(ActorPublisherMessage.Request.class, >>>> request -> drain(totalDemand())) >>>> .match(ActorPublisherMessage.Cancel.class, cancel >>>> -> stop()) >>>> >>>> .match(ActorPublisherMessage.SubscriptionTimeoutExceeded.class, cancel -> >>>> stop()) >>>> .match(Status.Success.class, cancel -> stop()) >>>> .match(PoisonPill.class, cancel -> stop()) >>>> .matchAny(this::unhandled) >>>> .build() >>>> ); >>>> } >>>> >>>> @Override >>>> public Duration subscriptionTimeout() >>>> { >>>> return Duration.create(1, TimeUnit.SECONDS); >>>> } >>>> >>>> private void drain(long demand) >>>> { >>>> >>>> >>>> final int bufferSize = buf.size(); >>>> >>>> logger.debug("Stream is active {}. {}", isActive(), bufferSize); >>>> if (!isActive()) >>>> { >>>> return; >>>> } >>>> >>>> long maxItems = min(demand, bufferSize); >>>> >>>> logger.trace("Draining buffer with {} items. demand is {}.", maxItems, >>>> demand); >>>> Stream >>>> .iterate(0, i -> i + 1) >>>> .limit(maxItems) >>>> .forEach(i -> { >>>> Message msg = buf.poll(); >>>> logger.trace("Sending message {}.", msg); >>>> onNext(msg); >>>> }); >>>> } >>>> >>>> private void stop() { >>>> context().stop(self()); >>>> } >>>> } >>>> >>>> >>>> and I am creating the stream >>>> >>>> >>>> final Source<Message, ActorRef> stringSource = >>>> Source.actorPublisher(producerProps); // *<--- I construct producer with 5 >>>> element buffer but actually it is irrelevant.* >>>> final ActorRef producerRef = stringSource >>>> .map(msg -> msg.toString().toLowerCase()) >>>> .to(Sink.foreach(item -> { >>>> logger.info("got message {}", item); >>>> messages.add(item); >>>> })) >>>> .run(materializer); >>>> >>>> final int requestedElementsCount = 100; >>>> Thread thread = new Thread(() -> { >>>> Stream.iterate(0, i -> i + 1) >>>> .limit(requestedElementsCount) >>>> .forEach(i -> { >>>> producerRef.tell(new Result<>("Index " + i), noSender()); >>>> >>>> // sleepUninterruptibly(1, TimeUnit.MILLISECONDS); >>>> }); >>>> }); >>>> thread.start(); >>>> >>>> >>>> after starting the thread I await if *messages* get >>>> *requestedElementsCount* elements but it never happens unless I add sleep >>>> to my thread above. >>>> >>>> >>>> 1) I cannot figure out why is that. first of all MessageProducer is active >>>> but I can see in logs that demand is 0 and then buffer fills up and more >>>> messages are denied. is this my system/jvm/etc? >>>> >>>> I though that producer, message publisher and consumer runs on different >>>> threads and there should be no problem with consuming 100 messages, map >>>> them and put items to *messages* list. >>>> >>>> >>> You immediately send 100 messages to the MessageProducer. Those will be >>> possibly be received before the demand has been requested and therefore >>> they are dropped by your actor. >>> >>> >>>> >>>> >>>> here is a sample output >>>> >>>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 1 >>>> >>>> 13:21:04.029 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand >>>> is 0. *// <---- even there are elements in buffer, demand is 0* >>>> >>>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 2 >>>> >>>> 13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] >>>> TRACE c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 >>>> items. demand is 0. >>>> >>>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 3 >>>> >>>> 13:21:04.034 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] >>>> TRACE c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 >>>> items. demand is 0. >>>> >>>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 4 >>>> >>>> 13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] >>>> TRACE c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 >>>> items. demand is 0. >>>> >>>> 13:24:42.813 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5 >>>> >>>> 13:21:04.035 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] >>>> TRACE c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 >>>> items. demand is 0. >>>> >>>> ... >>>> >>>> 13:34:02.449 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Denying >>>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 9}. buffer >>>> size is 5. *<-- **lots of messages are dropped* >>>> 13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Denying >>>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 10}. buffer >>>> size is 5. >>>> 13:34:02.453 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-7] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Denying >>>> Result{MDC='949a8a9f-6e22-4941-be8d-340453dadeb9',value=Index 11}. buffer >>>> size is 5. >>>> >>>> ... >>>> >>>> 13:24:42.818 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Stream is active true. 5 *// <---- now >>>> subscriber wakes up and sends demand but there is no more messages than 5 >>>> :/* >>>> >>>> 13:21:04.052 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 4 items. demand >>>> is 4. >>>> 13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 1 items. demand >>>> is 2. >>>> 13:21:04.055 [PRODUCER_AKKA_SYSTEM-akka.actor.default-dispatcher-10] TRACE >>>> c.f.d.a.s.p.actor.MessageProducer - Draining buffer with 0 items. demand >>>> is 3. >>>> >>>> >>>> 2) the other thing I don't understand is how to get value from sink if I >>>> change it to >>>> >>>> >>>> final ActorRef producerRef = stringSource >>>> .map(msg -> msg.toString().toLowerCase()) >>>> .to(*Sink.head()*) *// <---------------------- from foreach to >>>> head* >>>> .run(materializer); >>>> >>>> >>>> head will return feature with first element encountered in stream, but >>>> since run returns actorRef I don't see the way to get that future. >>>> >>>> I think you will find the answer here: >>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/java/stream-flows-and-basics.html#Combining_materialized_values >>> >>> /Patrik >>> >>> >>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to akka-user+...@googlegroups.com. >>>> To post to this group, send email to akka...@googlegroups.com. >>>> Visit this group at http://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >>> >>> -- >>> >>> Patrik Nordwall >>> Typesafe <http://typesafe.com/> - Reactive apps on the JVM >>> Twitter: @patriknw >>> >>> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com <javascript:>. >> To post to this group, send email to akka...@googlegroups.com >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > > Patrik Nordwall > Typesafe <http://typesafe.com/> - Reactive apps on the JVM > Twitter: @patriknw > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.