For whoever finds this thread and struggles with the same problem... I've 
found a solution that fixed it for me.

implicit val materializer = ActorMaterializer(
    ActorMaterializerSettings(system)
      .withAutoFusing(false) // flows with LateKafka get messed up if this 
is true!
  )

Disabling auto fusing fixed this for me.  Not exactly sure what or why, but 
something in my code didn't play well with auto fusing.  I've got plenty of 
other Akka code that uses the default, presumably enabled, auto fusing with 
no issues, so it's something peculiar to this particular application. 

If you find yourself with the same issue, turn it off and see if that helps.

On Saturday, August 6, 2016 at 11:56:55 AM UTC-5, tigerfoot wrote:
>
> Hi guys,
>
> Yeah, I knew the first thing ya'll would need is a reproducer, but sadly 
> its wired into a lot of heavy complexity, and... I can't even reproduce it 
> consistently.  It sometimes fails.
>
> I haven't (consciously) set auto-fusing or fuzzing--it's whatever the 
> defaults are out-of-the-box.
>
> filter and commitQ are is here:
>
>       val filter = builder.add(Partition[CRec](2, (crec) => {
>         crec.value match {
>           case SendIntentMessage(m) if (m.payload.recipients.size > 0 && 
> m.payload.recipients.head.media == media) => 0
>           case SendEdibleMessage(m) if (m.payload.recipients.size > 0 && 
> m.payload.recipients.head.media == media) => 0
>           case _ => 1
>         }
>       }))
>      val commitQ = Flow[CRec].map(late.commit(_))  // LateKafka (async) 
> commit of ConsumerRecord pulled off Kafka and carried thru stream.
>
> I'll keep digging and see if I can at least come up with some way to strip 
> it down enough to be practically sharable and hopefully still reproduce the 
> problem.
>
> On Saturday, August 6, 2016 at 1:58:16 AM UTC-5, drewhk wrote:
>>
>> Hi Greg,
>>
>> On Sat, Aug 6, 2016 at 1:11 AM, tigerfoot <gzo...@gmail.com> wrote:
>>
>>> I'm having a nasty issue I hope someone can help me with.
>>>
>>> I have some stream code like this:
>>>
>>>       val contentAssembly = Flow[CRec].map { crec =>
>>>         println("HERE!")
>>>         val x = 
>>> expression.render(crec.value).asInstanceOf[Message[OutputWrapper]]
>>>         println("X is "+x)
>>>         (crec, x)
>>>       }
>>>       val sendIt = Flow[(CRec, Message[OutputWrapper])].map { case 
>>> (crec, output) => println("THERE!"); sendAndNotifyFn(output); crec }
>>>
>>>       src ~> filter ~> contentAssembly ~> sendIt ~> commitQ ~> 
>>> Sink.ignore 
>>>              filter ~> commitQ ~> Sink.ignore
>>>
>>>
>>> The two interesting stages are contentAssembly and sendIt.  About 1/2 
>>> the time this works just fine. 
>>> When it works, I see output from the "HERE!" message, the "X" message, 
>>> and the "THERE" message.
>>>
>>> When it doesn't work I see HERE! and X but not the THERE.  As you can 
>>> see, there nothing in my code that happens between the "X" message in 
>>> contentAssembly and the THERE message in sendIt.
>>>
>>> My working theory is that somehow Akka Streams is sometimes losing the 
>>> supply/demand handshake between these two stages.
>>>
>>
>> I would be *very very *surprised by that. What does filter and commitQ 
>> do, they seem to be fan-out and fan-in stages. I personally suspect the 
>> usual bounded-buffer deadlock caused by cycles.
>>
>> Is auto-fusing enabled or disabled? Is fuzzing enabled or disabled?
>>
>> We need a reproducer to be able to say more than this at this point.
>>
>> -Endre
>>  
>>
>>>
>>> I first noticed this on 2.4.7 then moved to 2.4.9-RC2 hoping someone had 
>>> found a bug, but no luck.
>>>
>>> What can I do to dig deeper and try to figure out what's going on 
>>> between contentAssembly and sendIt stages?
>>>
>>> Thanks,
>>> Greg
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to