Thank you guys, I will look to see what I can do to contribute. On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <joe.w...@gmail.com> wrote:
> That said I think we can improve our handling of the consumer (kafka > client) and session (nifi transactional logic) and solve the problem. > It is related to our backpressure/consumer handling so we can fix > that. > > Thanks > Joe > > On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bbe...@gmail.com> wrote: > > No data loss, but you may process the same message twice in NiFi. > > > > The ordering of operations is: > > > > 1) poll Kafka > > 2) write received data to flow file > > 3) commit NiFi session so data in flow file cannot be lost > > 4) commit offsets to Kafka > > > > Doing it this way achieves at-least once processing which means you > > can't ever lose data, but you can process data twice. > > > > If we committed the offsets before committing the flow file you would > > never get duplicates, but you could lose a message if a crash happened > > between commit the offset and committing the NiFi session (at-most > > once processing). > > > > So the error is happening on #4 and NiFi has already produced a flow > > file with the message, but then Kafka says it can't update the offset, > > and then another consumer will likely pull that same message again and > > produce another flow file with the same message. > > > > > > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza > > <nick.care...@thecontrolgroup.com> wrote: > >> That makes perfect sense. To be clear, is there any potential to lose > >> messages in this scenario? > >> > >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.w...@gmail.com> wrote: > >>> > >>> yeah this is probably a good case/cause for use of the pause concept > >>> in kafka consumers. > >>> > >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbe...@gmail.com> wrote: > >>> > I believe you are running into this issue: > >>> > > >>> > https://issues.apache.org/jira/browse/NIFI-3189 > >>> > > >>> > When back-pressure happens on the queue coming out of ConsumeKafka, > >>> > this can last for longer than session.timeout.ms, and when the > >>> > processors resumes executing it receives this error on the first > >>> > execution. We should be able to implement some type of keep-alive so > >>> > that even when the processor is not executing, there is a background > >>> > thread, or some way of keeping the connections alive. > >>> > > >>> > I believe any user-defined properties in the processor get passed to > >>> > the Kafka consumer, so I believe you could add "session.timeout.ms" > >>> > and set a much higher value as a possible work around. > >>> > > >>> > Thanks, > >>> > > >>> > Bryan > >>> > > >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura < > ijokaruma...@gmail.com> > >>> > wrote: > >>> >> Hello Nick, > >>> >> > >>> >> First, I assume "had a queue back up" means have a queue being > >>> >> back-pressure. Sorry if that was different meaning. > >>> >> > >>> >> I was trying to reproduce by following flow: > >>> >> ConsumeKafka_0_10 > >>> >> -- success: Back Pressure Object Threshold = 10 > >>> >> -- UpdateAttribute (Stopped) > >>> >> > >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages. > >>> >> The result was, when NiFi received 10th messages, the success > >>> >> relationship back-pressure was enabled. > >>> >> When I published the 11th message, NiFi didn't do anything. > >>> >> This is expected behavior because downstream connection is > >>> >> back-pressured, the processor won't be scheduled. > >>> >> > >>> >> After I started UpdateAttribute and the queued flow files went > >>> >> through, ConsumeKafka was executed again and received the 11th > >>> >> message. > >>> >> > >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code, > >>> >> those warning and error message is logged because NiFi received > >>> >> KafkaException when it tried to commit offset to Kafka. > >>> >> > >>> >> Were there anything in Kafka server logs? I suspect something had > >>> >> happened at Kafka server side. > >>> >> > >>> >> Thanks, > >>> >> Koji > >>> >> > >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza > >>> >> <nick.care...@thecontrolgroup.com> wrote: > >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally > operates > >>> >>> without > >>> >>> problems. I had a queue back up due to a downstream processor and I > >>> >>> started > >>> >>> getting these bulletins. > >>> >>> > >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] > Duplicates > >>> >>> are > >>> >>> likely as we were able to commit the process session but received > an > >>> >>> exception from Kafka while committing offsets. > >>> >>> > >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] > Exception > >>> >>> while > >>> >>> interacting with Kafka so will close the lease > >>> >>> > >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$ > SimpleConsumerLease@87d2ac1 > >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: > Commit > >>> >>> cannot be completed since the group has already rebalanced and > >>> >>> assigned the > >>> >>> partitions to another member. This means that the time between > >>> >>> subsequent > >>> >>> calls to poll() was longer than the configured session.timeout.ms, > >>> >>> which > >>> >>> typically implies that the poll loop is spending too much time > message > >>> >>> processing. You can address this either by increasing the session > >>> >>> timeout or > >>> >>> by reducing the maximum size of batches returned in poll() with > >>> >>> max.poll.records. > >>> >>> > >>> >>> My max.poll.records is set to 10000 on my consumer and > >>> >>> session.timeout.ms is > >>> >>> the default 10000 on the server. > >>> >>> > >>> >>> Since there is no such thing as coincidences, I believe this has > to do > >>> >>> with > >>> >>> it not being able to push received messages to the downstream > queue. > >>> >>> > >>> >>> If my flow is backed up, I expect the ConsumKafka processor not to > >>> >>> throw > >>> >>> errors but continue to heartbeat with the Kafka server and resume > >>> >>> consuming > >>> >>> once it can commit to the downstream queue? > >>> >>> > >>> >>> Might I have the server or consumer misconfigured to handle this > >>> >>> scenario or > >>> >>> should the consumer not be throwing this error? > >>> >>> > >>> >>> Thanks, > >>> >>> - Nick > >> > >> >