Hi all,

I just submitted a PR 1527, to add retaining connection logic to
ConsumeKafka_0_10.
https://github.com/apache/nifi/pull/1527

Any feedback would be appreciated. Especially, Nick, if you can
confirm that this fix would work as you expected, it'd be great!

Thanks,
Koji

On Wed, Feb 22, 2017 at 8:47 AM, Koji Kawamura <ijokaruma...@gmail.com> wrote:
> Hi Nick,
>
> I understand that. I will continue adding more code to iron out 
> implementation.
> Please let me know if you find anything by looking at the code. I'd
> like you to review the branch in detail once the PR is ready.
>
> Thanks!
> Koji
>
> On Tue, Feb 21, 2017 at 9:06 AM, Nick Carenza
> <nick.care...@thecontrolgroup.com> wrote:
>> Hey Koji, thanks for putting in the time.
>>
>> I have not had a chance to start working on this myself and I certainly
>> support any effort to resolve it. I'll take a look at your branch and play
>> around with it.
>>
>> Thanks!,
>> Nick
>>
>> On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ijokaruma...@gmail.com>
>> wrote:
>>>
>>> Hi Nick, Joe, Bryan,
>>>
>>> I confirmed that this is easily reproducible and I got exactly the
>>> same stacktrace.
>>>
>>> Also I was curious about how Kafka consumer's pause/resume API works,
>>> so I've gone further and done a simple experiment to see if it helps
>>> in this situation to retain consumer connection.
>>>
>>> The experimentation code in my remote branch here.
>>>
>>> https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf
>>>
>>> It seems working as expected. After back-pressure is engaged,
>>> ConsumeKafka is not triggered for a while, then when downstream
>>> recovers from back-pressure, ConsumeKafka resumes consuming messages
>>> without any loss or warning messages using the same consumer instance.
>>>
>>> Nick, have you already start working on fixing this?
>>> If you haven't, and the approach using pause/resume sounds reasonable,
>>> I am going to add more synchronization, start/stop processor handling,
>>> and make it work more properly with onTrigger so that it only try to
>>> retain when there hasn't been any polling activity for some period of
>>> time.
>>>
>>> How do you think?
>>> Any feedback would be appreciated. Thanks!
>>>
>>> Koji
>>>
>>> On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
>>> <nick.care...@thecontrolgroup.com> wrote:
>>> > Thank you guys, I will look to see what I can do to contribute.
>>> >
>>> > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <joe.w...@gmail.com> wrote:
>>> >>
>>> >> That said I think we can improve our handling of the consumer (kafka
>>> >> client) and session (nifi transactional logic) and solve the problem.
>>> >> It is related to our backpressure/consumer handling so we can fix
>>> >> that.
>>> >>
>>> >> Thanks
>>> >> Joe
>>> >>
>>> >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bbe...@gmail.com> wrote:
>>> >> > No data loss, but you may process the same message twice in NiFi.
>>> >> >
>>> >> > The ordering of operations is:
>>> >> >
>>> >> > 1) poll Kafka
>>> >> > 2) write received data to flow file
>>> >> > 3) commit NiFi session so data in flow file cannot be lost
>>> >> > 4) commit offsets to Kafka
>>> >> >
>>> >> > Doing it this way achieves at-least once processing which means you
>>> >> > can't ever lose data, but you can process data twice.
>>> >> >
>>> >> > If we committed the offsets before committing the flow file you would
>>> >> > never get duplicates, but you could lose a message if a crash
>>> >> > happened
>>> >> > between commit the offset and committing the NiFi session (at-most
>>> >> > once processing).
>>> >> >
>>> >> > So the error is happening on #4 and NiFi has already produced a flow
>>> >> > file with the message, but then Kafka says it can't update the
>>> >> > offset,
>>> >> > and then another consumer will likely pull that same message again
>>> >> > and
>>> >> > produce another flow file with the same message.
>>> >> >
>>> >> >
>>> >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
>>> >> > <nick.care...@thecontrolgroup.com> wrote:
>>> >> >> That makes perfect sense. To be clear, is there any potential to
>>> >> >> lose
>>> >> >> messages in this scenario?
>>> >> >>
>>> >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.w...@gmail.com> wrote:
>>> >> >>>
>>> >> >>> yeah this is probably a good case/cause for use of the pause
>>> >> >>> concept
>>> >> >>> in kafka consumers.
>>> >> >>>
>>> >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbe...@gmail.com>
>>> >> >>> wrote:
>>> >> >>> > I believe you are running into this issue:
>>> >> >>> >
>>> >> >>> > https://issues.apache.org/jira/browse/NIFI-3189
>>> >> >>> >
>>> >> >>> > When back-pressure happens on the queue coming out of
>>> >> >>> > ConsumeKafka,
>>> >> >>> > this can last for longer than session.timeout.ms, and when the
>>> >> >>> > processors resumes executing it receives this error on the first
>>> >> >>> > execution. We should be able to implement some type of keep-alive
>>> >> >>> > so
>>> >> >>> > that even when the processor is not executing, there is a
>>> >> >>> > background
>>> >> >>> > thread, or some way of keeping the connections alive.
>>> >> >>> >
>>> >> >>> > I believe any user-defined properties in the processor get passed
>>> >> >>> > to
>>> >> >>> > the Kafka consumer, so I believe you could add
>>> >> >>> > "session.timeout.ms"
>>> >> >>> > and set a much higher value as a possible work around.
>>> >> >>> >
>>> >> >>> > Thanks,
>>> >> >>> >
>>> >> >>> > Bryan
>>> >> >>> >
>>> >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
>>> >> >>> > <ijokaruma...@gmail.com>
>>> >> >>> > wrote:
>>> >> >>> >> Hello Nick,
>>> >> >>> >>
>>> >> >>> >> First, I assume "had a queue back up" means have a queue being
>>> >> >>> >> back-pressure. Sorry if that was different meaning.
>>> >> >>> >>
>>> >> >>> >> I was trying to reproduce by following flow:
>>> >> >>> >> ConsumeKafka_0_10
>>> >> >>> >>   -- success: Back Pressure Object Threshold = 10
>>> >> >>> >>     -- UpdateAttribute (Stopped)
>>> >> >>> >>
>>> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>>> >> >>> >> The result was, when NiFi received 10th messages, the success
>>> >> >>> >> relationship back-pressure was enabled.
>>> >> >>> >> When I published the 11th message, NiFi didn't do anything.
>>> >> >>> >> This is expected behavior because downstream connection is
>>> >> >>> >> back-pressured, the processor won't be scheduled.
>>> >> >>> >>
>>> >> >>> >> After I started UpdateAttribute and the queued flow files went
>>> >> >>> >> through, ConsumeKafka was executed again and received the 11th
>>> >> >>> >> message.
>>> >> >>> >>
>>> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
>>> >> >>> >> code,
>>> >> >>> >> those warning and error message is logged because NiFi received
>>> >> >>> >> KafkaException when it tried to commit offset to Kafka.
>>> >> >>> >>
>>> >> >>> >> Were there anything in Kafka server logs? I suspect something
>>> >> >>> >> had
>>> >> >>> >> happened at Kafka server side.
>>> >> >>> >>
>>> >> >>> >> Thanks,
>>> >> >>> >> Koji
>>> >> >>> >>
>>> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>>> >> >>> >> <nick.care...@thecontrolgroup.com> wrote:
>>> >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
>>> >> >>> >>> operates
>>> >> >>> >>> without
>>> >> >>> >>> problems. I had a queue back up due to a downstream processor
>>> >> >>> >>> and
>>> >> >>> >>> I
>>> >> >>> >>> started
>>> >> >>> >>> getting these bulletins.
>>> >> >>> >>>
>>> >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>>> >> >>> >>> Duplicates
>>> >> >>> >>> are
>>> >> >>> >>> likely as we were able to commit the process session but
>>> >> >>> >>> received
>>> >> >>> >>> an
>>> >> >>> >>> exception from Kafka while committing offsets.
>>> >> >>> >>>
>>> >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>>> >> >>> >>> Exception
>>> >> >>> >>> while
>>> >> >>> >>> interacting with Kafka so will close the lease
>>> >> >>> >>>
>>> >> >>> >>>
>>> >> >>> >>>
>>> >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
>>> >> >>> >>> Commit
>>> >> >>> >>> cannot be completed since the group has already rebalanced and
>>> >> >>> >>> assigned the
>>> >> >>> >>> partitions to another member. This means that the time between
>>> >> >>> >>> subsequent
>>> >> >>> >>> calls to poll() was longer than the configured
>>> >> >>> >>> session.timeout.ms,
>>> >> >>> >>> which
>>> >> >>> >>> typically implies that the poll loop is spending too much time
>>> >> >>> >>> message
>>> >> >>> >>> processing. You can address this either by increasing the
>>> >> >>> >>> session
>>> >> >>> >>> timeout or
>>> >> >>> >>> by reducing the maximum size of batches returned in poll() with
>>> >> >>> >>> max.poll.records.
>>> >> >>> >>>
>>> >> >>> >>> My max.poll.records is set to 10000 on my consumer and
>>> >> >>> >>> session.timeout.ms is
>>> >> >>> >>> the default 10000 on the server.
>>> >> >>> >>>
>>> >> >>> >>> Since there is no such thing as coincidences, I believe this
>>> >> >>> >>> has
>>> >> >>> >>> to do
>>> >> >>> >>> with
>>> >> >>> >>> it not being able to push received messages to the downstream
>>> >> >>> >>> queue.
>>> >> >>> >>>
>>> >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not
>>> >> >>> >>> to
>>> >> >>> >>> throw
>>> >> >>> >>> errors but continue to heartbeat with the Kafka server and
>>> >> >>> >>> resume
>>> >> >>> >>> consuming
>>> >> >>> >>> once it can commit to the downstream queue?
>>> >> >>> >>>
>>> >> >>> >>> Might I have the server or consumer misconfigured to handle
>>> >> >>> >>> this
>>> >> >>> >>> scenario or
>>> >> >>> >>> should the consumer not be throwing this error?
>>> >> >>> >>>
>>> >> >>> >>> Thanks,
>>> >> >>> >>> - Nick
>>> >> >>
>>> >> >>
>>> >
>>> >
>>
>>

Reply via email to