Hi Roland

Thank you for your prompt answer. I see your point. Let me clarify a how 
the protocol works and hopefully you can tell me if I will have an issue or 
not.

The protocol is the Beats <https://www.elastic.co/products/beats> protocol  
<https://www.elastic.co/products/beats>(successor to Lumberjack) that is 
used with log clients to send logs into Logstash.

   1. Beats clients first sends a window size (typically 10 messages) and 
   then the stream of log messages
   2. When the window size of messages have been processed, the server side 
   sends back an ack message
   3. If the Ack is not received the log messages will be retried after a 
   back off period

I did have issues during the initial implementation of the protocol where 
the Ack was never received on the client side. At that point I needed 
upstream to pull elements eleven times to get the ack, which is symmetrical 
the number of messages emitted in the flow. This did not work so I created 
a custom graph stage that requested bytes until the received window was 
filled and then emitted all elements with just one upstream pull 
(asymmetrical). 

I have made the assumption that having to pull two times or more from 
upstreams will cause my former problems to come back where the client will 
not get the ack, but I'm not familiar enough with the Stream TCP 
implementation to know this for sure.

Information on how TCP pull demand works in above scenario would be very 
appreciated.

Regads Magnus

Den måndag 2 maj 2016 kl. 07:07:36 UTC+2 skrev rkuhn:
>
> Hi Magnus,
>
> Not all stages are capable of always knowing in advance (i.e. Without 
> actually asking for more elements) that the stream is about to end. Having 
> additional demand is necessary in general if you need the completion signal.
>
> You say this "breaks the protocol"—which protocol do you mean here? The 
> Reactive Streams demand signal is an implementation detail and you cannot 
> rely on particular requests being made or not made anyway.
>
> Regards, Roland 
>
> Sent from my iPhone
>
> On 01 May 2016, at 22:16, Magnus Andersson <mag...@magnusart.com 
> <javascript:>> wrote:
>
> Hi
>
> I'm having some troubles combining akka streams and akka persistence. I 
> have a connectionHandler flow that I use for incoming TCP Streams that 
> looks like below.
>
> val connectionHandler = protocol.via( persist ).via( reply )
>
> When I run the following test I get a failure at expectComplete (3 seconds)
>
> testSource
>       .via( connectionHandler )
>       .runWith( TestSink.probe[ByteString] )
>       .request( 1 )
>       .expectNext( ack )
>       .expectComplete()
>
> However when I change the connectionHandler and remove the persist flow 
> stage, the test completes without any errors! 
> (see further below for details of filterAck and persist flows).
>
> val connectionHandler = protocol.via( filterAck ).via( reply )
>
> Now here comes the next part. If I modify my test to request 2 messages it 
> completes successfully. But this would break the protocol I'm implementing. 
>
> testSource
>       .via( connectionHandler )
>       .runWith( TestSink.probe[ByteString] )
>       .request( 2 )
>       .expectNext( ack )
>       .expectComplete()
>
> To me this looks like it is related to the MapAsync stage implementation. 
> I'm *guessing* below is the culprit from the MapAsync stage (
> Ops.scala#L835 
> <https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala#L835>
> ).
>
> override def onUpstreamFinish(): Unit = if (todo == 0) completeStage()
>
> *Question: How can I work around this? I need to persist my messages and 
> this test flow is supposed to complete after emitting one message.*
>
> Best regards
> Magnus Andersson
>
> Relevant code to explain the connectionHandler flow differences:
>   private[this] def persistEvent( logOrAck: Either[Beats.Log, Beats.Ack] ) 
> =
>     ( persisterManager ? logOrAck ).mapTo[Option[Beats.Ack]]
>
>   // Used in first test: resulted in failed test
>   private[ingestion] val persist: Flow[Either[Beats.Log, Beats.Ack], Beats
> .Ack, NotUsed] =
>     Flow[Either[Beats.Log, Beats.Ack]]
>       .mapAsync( 1 )( persistEvent ) 
>       .filter( _.isDefined )
>       .map( _.get )
>
>   // Used in second test to bypass persistence: resulted in successful 
> test
>   private[ingestion] val filterAck: Flow[Either[Beats.Log, Beats.Ack], 
> Beats.Ack, NotUsed] =
>     Flow[Either[Beats.Log, Beats.Ack]]
>       .filter( _.isRight )
>       .map( _.right.get )
>
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com <javascript:>.
> To post to this group, send email to akka...@googlegroups.com 
> <javascript:>.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to