Re: [akka-user] Akka Stream Getting Stuck (2.4.9-RC2)

2016-08-10 Thread Endre Varga
Hi,



On Wed, Aug 10, 2016 at 10:26 PM, tigerfoot  wrote:

> For whoever finds this thread and struggles with the same problem... I've
> found a solution that fixed it for me.
>
> implicit val materializer = ActorMaterializer(
> ActorMaterializerSettings(system)
>   .withAutoFusing(false) // flows with LateKafka get messed up if this
> is true!
>   )
>
> Disabling auto fusing fixed this for me.  Not exactly sure what or why,
> but something in my code didn't play well with auto fusing.
>

This basically reinforces my previous hunch, namely, that this is a classic
deadlock scenario. Somewhere you introduced a bounded-buffer deadlock.
Likely in your fan-out-then-merge undirected cycle. The reason is why I
think this is the case is because turning off fusing will simply introduce
buffer space that might be able to absorb enough elements to reduce the
chance of a deadlock.

You can easily test this: add a buffer stage in the arms of the undirected
cycle, set them to some large capacity and see what happens. I bet that you
will not see the error anymore even with auto-fusing on.

-Endre


>  I've got plenty of other Akka code that uses the default, presumably
> enabled, auto fusing with no issues, so it's something peculiar to this
> particular application.
>
> If you find yourself with the same issue, turn it off and see if that
> helps.
>
> On Saturday, August 6, 2016 at 11:56:55 AM UTC-5, tigerfoot wrote:
>>
>> Hi guys,
>>
>> Yeah, I knew the first thing ya'll would need is a reproducer, but sadly
>> its wired into a lot of heavy complexity, and... I can't even reproduce it
>> consistently.  It sometimes fails.
>>
>> I haven't (consciously) set auto-fusing or fuzzing--it's whatever the
>> defaults are out-of-the-box.
>>
>> filter and commitQ are is here:
>>
>>   val filter = builder.add(Partition[CRec](2, (crec) => {
>> crec.value match {
>>   case SendIntentMessage(m) if (m.payload.recipients.size > 0 &&
>> m.payload.recipients.head.media == media) => 0
>>   case SendEdibleMessage(m) if (m.payload.recipients.size > 0 &&
>> m.payload.recipients.head.media == media) => 0
>>   case _ => 1
>> }
>>   }))
>>  val commitQ = Flow[CRec].map(late.commit(_))  // LateKafka (async)
>> commit of ConsumerRecord pulled off Kafka and carried thru stream.
>>
>> I'll keep digging and see if I can at least come up with some way to
>> strip it down enough to be practically sharable and hopefully still
>> reproduce the problem.
>>
>> On Saturday, August 6, 2016 at 1:58:16 AM UTC-5, drewhk wrote:
>>>
>>> Hi Greg,
>>>
>>> On Sat, Aug 6, 2016 at 1:11 AM, tigerfoot  wrote:
>>>
 I'm having a nasty issue I hope someone can help me with.

 I have some stream code like this:

   val contentAssembly = Flow[CRec].map { crec =>
 println("HERE!")
 val x = expression.render(crec.value).
 asInstanceOf[Message[OutputWrapper]]
 println("X is "+x)
 (crec, x)
   }
   val sendIt = Flow[(CRec, Message[OutputWrapper])].map { case
 (crec, output) => println("THERE!"); sendAndNotifyFn(output); crec }

   src ~> filter ~> contentAssembly ~> sendIt ~> commitQ ~>
 Sink.ignore
  filter ~> commitQ ~> Sink.ignore


 The two interesting stages are contentAssembly and sendIt.  About 1/2
 the time this works just fine.
 When it works, I see output from the "HERE!" message, the "X" message,
 and the "THERE" message.

 When it doesn't work I see HERE! and X but not the THERE.  As you can
 see, there nothing in my code that happens between the "X" message in
 contentAssembly and the THERE message in sendIt.

 My working theory is that somehow Akka Streams is sometimes losing the
 supply/demand handshake between these two stages.

>>>
>>> I would be *very very *surprised by that. What does filter and commitQ
>>> do, they seem to be fan-out and fan-in stages. I personally suspect the
>>> usual bounded-buffer deadlock caused by cycles.
>>>
>>> Is auto-fusing enabled or disabled? Is fuzzing enabled or disabled?
>>>
>>> We need a reproducer to be able to say more than this at this point.
>>>
>>> -Endre
>>>
>>>

 I first noticed this on 2.4.7 then moved to 2.4.9-RC2 hoping someone
 had found a bug, but no luck.

 What can I do to dig deeper and try to figure out what's going on
 between contentAssembly and sendIt stages?

 Thanks,
 Greg

 --
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ: http://doc.akka.io/docs/akka/c
 urrent/additional/faq.html
 >> Search the archives: https://groups.google.com/grou
 p/akka-user
 ---
 You received this message because you are subscribed to the Google
 Groups "Akka User List" group.
 To unsubscribe from this group and stop receiving emails from it, send
 an email to akka-user+.

Re: [akka-user] Akka Stream Getting Stuck (2.4.9-RC2)

2016-08-10 Thread tigerfoot
For whoever finds this thread and struggles with the same problem... I've 
found a solution that fixed it for me.

implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
  .withAutoFusing(false) // flows with LateKafka get messed up if this 
is true!
  )

Disabling auto fusing fixed this for me.  Not exactly sure what or why, but 
something in my code didn't play well with auto fusing.  I've got plenty of 
other Akka code that uses the default, presumably enabled, auto fusing with 
no issues, so it's something peculiar to this particular application. 

If you find yourself with the same issue, turn it off and see if that helps.

On Saturday, August 6, 2016 at 11:56:55 AM UTC-5, tigerfoot wrote:
>
> Hi guys,
>
> Yeah, I knew the first thing ya'll would need is a reproducer, but sadly 
> its wired into a lot of heavy complexity, and... I can't even reproduce it 
> consistently.  It sometimes fails.
>
> I haven't (consciously) set auto-fusing or fuzzing--it's whatever the 
> defaults are out-of-the-box.
>
> filter and commitQ are is here:
>
>   val filter = builder.add(Partition[CRec](2, (crec) => {
> crec.value match {
>   case SendIntentMessage(m) if (m.payload.recipients.size > 0 && 
> m.payload.recipients.head.media == media) => 0
>   case SendEdibleMessage(m) if (m.payload.recipients.size > 0 && 
> m.payload.recipients.head.media == media) => 0
>   case _ => 1
> }
>   }))
>  val commitQ = Flow[CRec].map(late.commit(_))  // LateKafka (async) 
> commit of ConsumerRecord pulled off Kafka and carried thru stream.
>
> I'll keep digging and see if I can at least come up with some way to strip 
> it down enough to be practically sharable and hopefully still reproduce the 
> problem.
>
> On Saturday, August 6, 2016 at 1:58:16 AM UTC-5, drewhk wrote:
>>
>> Hi Greg,
>>
>> On Sat, Aug 6, 2016 at 1:11 AM, tigerfoot  wrote:
>>
>>> I'm having a nasty issue I hope someone can help me with.
>>>
>>> I have some stream code like this:
>>>
>>>   val contentAssembly = Flow[CRec].map { crec =>
>>> println("HERE!")
>>> val x = 
>>> expression.render(crec.value).asInstanceOf[Message[OutputWrapper]]
>>> println("X is "+x)
>>> (crec, x)
>>>   }
>>>   val sendIt = Flow[(CRec, Message[OutputWrapper])].map { case 
>>> (crec, output) => println("THERE!"); sendAndNotifyFn(output); crec }
>>>
>>>   src ~> filter ~> contentAssembly ~> sendIt ~> commitQ ~> 
>>> Sink.ignore 
>>>  filter ~> commitQ ~> Sink.ignore
>>>
>>>
>>> The two interesting stages are contentAssembly and sendIt.  About 1/2 
>>> the time this works just fine. 
>>> When it works, I see output from the "HERE!" message, the "X" message, 
>>> and the "THERE" message.
>>>
>>> When it doesn't work I see HERE! and X but not the THERE.  As you can 
>>> see, there nothing in my code that happens between the "X" message in 
>>> contentAssembly and the THERE message in sendIt.
>>>
>>> My working theory is that somehow Akka Streams is sometimes losing the 
>>> supply/demand handshake between these two stages.
>>>
>>
>> I would be *very very *surprised by that. What does filter and commitQ 
>> do, they seem to be fan-out and fan-in stages. I personally suspect the 
>> usual bounded-buffer deadlock caused by cycles.
>>
>> Is auto-fusing enabled or disabled? Is fuzzing enabled or disabled?
>>
>> We need a reproducer to be able to say more than this at this point.
>>
>> -Endre
>>  
>>
>>>
>>> I first noticed this on 2.4.7 then moved to 2.4.9-RC2 hoping someone had 
>>> found a bug, but no luck.
>>>
>>> What can I do to dig deeper and try to figure out what's going on 
>>> between contentAssembly and sendIt stages?
>>>
>>> Thanks,
>>> Greg
>>>
>>> -- 
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this 

Re: [akka-user] Akka Stream Getting Stuck (2.4.9-RC2)

2016-08-06 Thread tigerfoot
Hi guys,

Yeah, I knew the first thing ya'll would need is a reproducer, but sadly 
its wired into a lot of heavy complexity, and... I can't even reproduce it 
consistently.  It sometimes fails.

I haven't (consciously) set auto-fusing or fuzzing--it's whatever the 
defaults are out-of-the-box.

filter and commitQ are is here:

  val filter = builder.add(Partition[CRec](2, (crec) => {
crec.value match {
  case SendIntentMessage(m) if (m.payload.recipients.size > 0 && 
m.payload.recipients.head.media == media) => 0
  case SendEdibleMessage(m) if (m.payload.recipients.size > 0 && 
m.payload.recipients.head.media == media) => 0
  case _ => 1
}
  }))
 val commitQ = Flow[CRec].map(late.commit(_))  // LateKafka (async) 
commit of ConsumerRecord pulled off Kafka and carried thru stream.

I'll keep digging and see if I can at least come up with some way to strip 
it down enough to be practically sharable and hopefully still reproduce the 
problem.

On Saturday, August 6, 2016 at 1:58:16 AM UTC-5, drewhk wrote:
>
> Hi Greg,
>
> On Sat, Aug 6, 2016 at 1:11 AM, tigerfoot > 
> wrote:
>
>> I'm having a nasty issue I hope someone can help me with.
>>
>> I have some stream code like this:
>>
>>   val contentAssembly = Flow[CRec].map { crec =>
>> println("HERE!")
>> val x = 
>> expression.render(crec.value).asInstanceOf[Message[OutputWrapper]]
>> println("X is "+x)
>> (crec, x)
>>   }
>>   val sendIt = Flow[(CRec, Message[OutputWrapper])].map { case (crec, 
>> output) => println("THERE!"); sendAndNotifyFn(output); crec }
>>
>>   src ~> filter ~> contentAssembly ~> sendIt ~> commitQ ~> 
>> Sink.ignore 
>>  filter ~> commitQ ~> Sink.ignore
>>
>>
>> The two interesting stages are contentAssembly and sendIt.  About 1/2 the 
>> time this works just fine. 
>> When it works, I see output from the "HERE!" message, the "X" message, 
>> and the "THERE" message.
>>
>> When it doesn't work I see HERE! and X but not the THERE.  As you can 
>> see, there nothing in my code that happens between the "X" message in 
>> contentAssembly and the THERE message in sendIt.
>>
>> My working theory is that somehow Akka Streams is sometimes losing the 
>> supply/demand handshake between these two stages.
>>
>
> I would be *very very *surprised by that. What does filter and commitQ 
> do, they seem to be fan-out and fan-in stages. I personally suspect the 
> usual bounded-buffer deadlock caused by cycles.
>
> Is auto-fusing enabled or disabled? Is fuzzing enabled or disabled?
>
> We need a reproducer to be able to say more than this at this point.
>
> -Endre
>  
>
>>
>> I first noticed this on 2.4.7 then moved to 2.4.9-RC2 hoping someone had 
>> found a bug, but no luck.
>>
>> What can I do to dig deeper and try to figure out what's going on between 
>> contentAssembly and sendIt stages?
>>
>> Thanks,
>> Greg
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Stream Getting Stuck (2.4.9-RC2)

2016-08-05 Thread Endre Varga
Hi Greg,

On Sat, Aug 6, 2016 at 1:11 AM, tigerfoot  wrote:

> I'm having a nasty issue I hope someone can help me with.
>
> I have some stream code like this:
>
>   val contentAssembly = Flow[CRec].map { crec =>
> println("HERE!")
> val x = expression.render(crec.value).asInstanceOf[Message[
> OutputWrapper]]
> println("X is "+x)
> (crec, x)
>   }
>   val sendIt = Flow[(CRec, Message[OutputWrapper])].map { case (crec,
> output) => println("THERE!"); sendAndNotifyFn(output); crec }
>
>   src ~> filter ~> contentAssembly ~> sendIt ~> commitQ ~> Sink.ignore
>  filter ~> commitQ ~> Sink.ignore
>
>
> The two interesting stages are contentAssembly and sendIt.  About 1/2 the
> time this works just fine.
> When it works, I see output from the "HERE!" message, the "X" message, and
> the "THERE" message.
>
> When it doesn't work I see HERE! and X but not the THERE.  As you can see,
> there nothing in my code that happens between the "X" message in
> contentAssembly and the THERE message in sendIt.
>
> My working theory is that somehow Akka Streams is sometimes losing the
> supply/demand handshake between these two stages.
>

I would be *very very *surprised by that. What does filter and commitQ do,
they seem to be fan-out and fan-in stages. I personally suspect the usual
bounded-buffer deadlock caused by cycles.

Is auto-fusing enabled or disabled? Is fuzzing enabled or disabled?

We need a reproducer to be able to say more than this at this point.

-Endre


>
> I first noticed this on 2.4.7 then moved to 2.4.9-RC2 hoping someone had
> found a bug, but no luck.
>
> What can I do to dig deeper and try to figure out what's going on between
> contentAssembly and sendIt stages?
>
> Thanks,
> Greg
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.