No data loss, but you may process the same message twice in NiFi.

The ordering of operations is:

1) poll Kafka
2) write received data to flow file
3) commit NiFi session so data in flow file cannot be lost
4) commit offsets to Kafka

Doing it this way achieves at-least once processing which means you
can't ever lose data, but you can process data twice.

If we committed the offsets before committing the flow file you would
never get duplicates, but you could lose a message if a crash happened
between commit the offset and committing the NiFi session (at-most
once processing).

So the error is happening on #4 and NiFi has already produced a flow
file with the message, but then Kafka says it can't update the offset,
and then another consumer will likely pull that same message again and
produce another flow file with the same message.


On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
<nick.care...@thecontrolgroup.com> wrote:
> That makes perfect sense. To be clear, is there any potential to lose
> messages in this scenario?
>
> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.w...@gmail.com> wrote:
>>
>> yeah this is probably a good case/cause for use of the pause concept
>> in kafka consumers.
>>
>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbe...@gmail.com> wrote:
>> > I believe you are running into this issue:
>> >
>> > https://issues.apache.org/jira/browse/NIFI-3189
>> >
>> > When back-pressure happens on the queue coming out of ConsumeKafka,
>> > this can last for longer than session.timeout.ms, and when the
>> > processors resumes executing it receives this error on the first
>> > execution. We should be able to implement some type of keep-alive so
>> > that even when the processor is not executing, there is a background
>> > thread, or some way of keeping the connections alive.
>> >
>> > I believe any user-defined properties in the processor get passed to
>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
>> > and set a much higher value as a possible work around.
>> >
>> > Thanks,
>> >
>> > Bryan
>> >
>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ijokaruma...@gmail.com>
>> > wrote:
>> >> Hello Nick,
>> >>
>> >> First, I assume "had a queue back up" means have a queue being
>> >> back-pressure. Sorry if that was different meaning.
>> >>
>> >> I was trying to reproduce by following flow:
>> >> ConsumeKafka_0_10
>> >>   -- success: Back Pressure Object Threshold = 10
>> >>     -- UpdateAttribute (Stopped)
>> >>
>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> >> The result was, when NiFi received 10th messages, the success
>> >> relationship back-pressure was enabled.
>> >> When I published the 11th message, NiFi didn't do anything.
>> >> This is expected behavior because downstream connection is
>> >> back-pressured, the processor won't be scheduled.
>> >>
>> >> After I started UpdateAttribute and the queued flow files went
>> >> through, ConsumeKafka was executed again and received the 11th
>> >> message.
>> >>
>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>> >> those warning and error message is logged because NiFi received
>> >> KafkaException when it tried to commit offset to Kafka.
>> >>
>> >> Were there anything in Kafka server logs? I suspect something had
>> >> happened at Kafka server side.
>> >>
>> >> Thanks,
>> >> Koji
>> >>
>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> >> <nick.care...@thecontrolgroup.com> wrote:
>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
>> >>> without
>> >>> problems. I had a queue back up due to a downstream processor and I
>> >>> started
>> >>> getting these bulletins.
>> >>>
>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
>> >>> are
>> >>> likely as we were able to commit the process session but received an
>> >>> exception from Kafka while committing offsets.
>> >>>
>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
>> >>> while
>> >>> interacting with Kafka so will close the lease
>> >>>
>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> >>> cannot be completed since the group has already rebalanced and
>> >>> assigned the
>> >>> partitions to another member. This means that the time between
>> >>> subsequent
>> >>> calls to poll() was longer than the configured session.timeout.ms,
>> >>> which
>> >>> typically implies that the poll loop is spending too much time message
>> >>> processing. You can address this either by increasing the session
>> >>> timeout or
>> >>> by reducing the maximum size of batches returned in poll() with
>> >>> max.poll.records.
>> >>>
>> >>> My max.poll.records is set to 10000 on my consumer and
>> >>> session.timeout.ms is
>> >>> the default 10000 on the server.
>> >>>
>> >>> Since there is no such thing as coincidences, I believe this has to do
>> >>> with
>> >>> it not being able to push received messages to the downstream queue.
>> >>>
>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
>> >>> throw
>> >>> errors but continue to heartbeat with the Kafka server and resume
>> >>> consuming
>> >>> once it can commit to the downstream queue?
>> >>>
>> >>> Might I have the server or consumer misconfigured to handle this
>> >>> scenario or
>> >>> should the consumer not be throwing this error?
>> >>>
>> >>> Thanks,
>> >>> - Nick
>
>

Reply via email to