Hey Koji, thanks for putting in the time.

I have not had a chance to start working on this myself and I certainly
support any effort to resolve it. I'll take a look at your branch and play
around with it.

Thanks!,
Nick

On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ijokaruma...@gmail.com>
wrote:

> Hi Nick, Joe, Bryan,
>
> I confirmed that this is easily reproducible and I got exactly the
> same stacktrace.
>
> Also I was curious about how Kafka consumer's pause/resume API works,
> so I've gone further and done a simple experiment to see if it helps
> in this situation to retain consumer connection.
>
> The experimentation code in my remote branch here.
> https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a
> 29662ba9bf
>
> It seems working as expected. After back-pressure is engaged,
> ConsumeKafka is not triggered for a while, then when downstream
> recovers from back-pressure, ConsumeKafka resumes consuming messages
> without any loss or warning messages using the same consumer instance.
>
> Nick, have you already start working on fixing this?
> If you haven't, and the approach using pause/resume sounds reasonable,
> I am going to add more synchronization, start/stop processor handling,
> and make it work more properly with onTrigger so that it only try to
> retain when there hasn't been any polling activity for some period of
> time.
>
> How do you think?
> Any feedback would be appreciated. Thanks!
>
> Koji
>
> On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
> <nick.care...@thecontrolgroup.com> wrote:
> > Thank you guys, I will look to see what I can do to contribute.
> >
> > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <joe.w...@gmail.com> wrote:
> >>
> >> That said I think we can improve our handling of the consumer (kafka
> >> client) and session (nifi transactional logic) and solve the problem.
> >> It is related to our backpressure/consumer handling so we can fix
> >> that.
> >>
> >> Thanks
> >> Joe
> >>
> >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bbe...@gmail.com> wrote:
> >> > No data loss, but you may process the same message twice in NiFi.
> >> >
> >> > The ordering of operations is:
> >> >
> >> > 1) poll Kafka
> >> > 2) write received data to flow file
> >> > 3) commit NiFi session so data in flow file cannot be lost
> >> > 4) commit offsets to Kafka
> >> >
> >> > Doing it this way achieves at-least once processing which means you
> >> > can't ever lose data, but you can process data twice.
> >> >
> >> > If we committed the offsets before committing the flow file you would
> >> > never get duplicates, but you could lose a message if a crash happened
> >> > between commit the offset and committing the NiFi session (at-most
> >> > once processing).
> >> >
> >> > So the error is happening on #4 and NiFi has already produced a flow
> >> > file with the message, but then Kafka says it can't update the offset,
> >> > and then another consumer will likely pull that same message again and
> >> > produce another flow file with the same message.
> >> >
> >> >
> >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
> >> > <nick.care...@thecontrolgroup.com> wrote:
> >> >> That makes perfect sense. To be clear, is there any potential to lose
> >> >> messages in this scenario?
> >> >>
> >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.w...@gmail.com> wrote:
> >> >>>
> >> >>> yeah this is probably a good case/cause for use of the pause concept
> >> >>> in kafka consumers.
> >> >>>
> >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbe...@gmail.com>
> wrote:
> >> >>> > I believe you are running into this issue:
> >> >>> >
> >> >>> > https://issues.apache.org/jira/browse/NIFI-3189
> >> >>> >
> >> >>> > When back-pressure happens on the queue coming out of
> ConsumeKafka,
> >> >>> > this can last for longer than session.timeout.ms, and when the
> >> >>> > processors resumes executing it receives this error on the first
> >> >>> > execution. We should be able to implement some type of keep-alive
> so
> >> >>> > that even when the processor is not executing, there is a
> background
> >> >>> > thread, or some way of keeping the connections alive.
> >> >>> >
> >> >>> > I believe any user-defined properties in the processor get passed
> to
> >> >>> > the Kafka consumer, so I believe you could add "
> session.timeout.ms"
> >> >>> > and set a much higher value as a possible work around.
> >> >>> >
> >> >>> > Thanks,
> >> >>> >
> >> >>> > Bryan
> >> >>> >
> >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
> >> >>> > <ijokaruma...@gmail.com>
> >> >>> > wrote:
> >> >>> >> Hello Nick,
> >> >>> >>
> >> >>> >> First, I assume "had a queue back up" means have a queue being
> >> >>> >> back-pressure. Sorry if that was different meaning.
> >> >>> >>
> >> >>> >> I was trying to reproduce by following flow:
> >> >>> >> ConsumeKafka_0_10
> >> >>> >>   -- success: Back Pressure Object Threshold = 10
> >> >>> >>     -- UpdateAttribute (Stopped)
> >> >>> >>
> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> >> >>> >> The result was, when NiFi received 10th messages, the success
> >> >>> >> relationship back-pressure was enabled.
> >> >>> >> When I published the 11th message, NiFi didn't do anything.
> >> >>> >> This is expected behavior because downstream connection is
> >> >>> >> back-pressured, the processor won't be scheduled.
> >> >>> >>
> >> >>> >> After I started UpdateAttribute and the queued flow files went
> >> >>> >> through, ConsumeKafka was executed again and received the 11th
> >> >>> >> message.
> >> >>> >>
> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
> >> >>> >> code,
> >> >>> >> those warning and error message is logged because NiFi received
> >> >>> >> KafkaException when it tried to commit offset to Kafka.
> >> >>> >>
> >> >>> >> Were there anything in Kafka server logs? I suspect something had
> >> >>> >> happened at Kafka server side.
> >> >>> >>
> >> >>> >> Thanks,
> >> >>> >> Koji
> >> >>> >>
> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> >> >>> >> <nick.care...@thecontrolgroup.com> wrote:
> >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
> >> >>> >>> operates
> >> >>> >>> without
> >> >>> >>> problems. I had a queue back up due to a downstream processor
> and
> >> >>> >>> I
> >> >>> >>> started
> >> >>> >>> getting these bulletins.
> >> >>> >>>
> >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> >> >>> >>> Duplicates
> >> >>> >>> are
> >> >>> >>> likely as we were able to commit the process session but
> received
> >> >>> >>> an
> >> >>> >>> exception from Kafka while committing offsets.
> >> >>> >>>
> >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> >> >>> >>> Exception
> >> >>> >>> while
> >> >>> >>> interacting with Kafka so will close the lease
> >> >>> >>>
> >> >>> >>>
> >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
> SimpleConsumerLease@87d2ac1
> >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
> >> >>> >>> Commit
> >> >>> >>> cannot be completed since the group has already rebalanced and
> >> >>> >>> assigned the
> >> >>> >>> partitions to another member. This means that the time between
> >> >>> >>> subsequent
> >> >>> >>> calls to poll() was longer than the configured
> session.timeout.ms,
> >> >>> >>> which
> >> >>> >>> typically implies that the poll loop is spending too much time
> >> >>> >>> message
> >> >>> >>> processing. You can address this either by increasing the
> session
> >> >>> >>> timeout or
> >> >>> >>> by reducing the maximum size of batches returned in poll() with
> >> >>> >>> max.poll.records.
> >> >>> >>>
> >> >>> >>> My max.poll.records is set to 10000 on my consumer and
> >> >>> >>> session.timeout.ms is
> >> >>> >>> the default 10000 on the server.
> >> >>> >>>
> >> >>> >>> Since there is no such thing as coincidences, I believe this has
> >> >>> >>> to do
> >> >>> >>> with
> >> >>> >>> it not being able to push received messages to the downstream
> >> >>> >>> queue.
> >> >>> >>>
> >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not
> to
> >> >>> >>> throw
> >> >>> >>> errors but continue to heartbeat with the Kafka server and
> resume
> >> >>> >>> consuming
> >> >>> >>> once it can commit to the downstream queue?
> >> >>> >>>
> >> >>> >>> Might I have the server or consumer misconfigured to handle this
> >> >>> >>> scenario or
> >> >>> >>> should the consumer not be throwing this error?
> >> >>> >>>
> >> >>> >>> Thanks,
> >> >>> >>> - Nick
> >> >>
> >> >>
> >
> >
>

Reply via email to