Hey Koji, thanks for putting in the time. I have not had a chance to start working on this myself and I certainly support any effort to resolve it. I'll take a look at your branch and play around with it.
Thanks!, Nick On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ijokaruma...@gmail.com> wrote: > Hi Nick, Joe, Bryan, > > I confirmed that this is easily reproducible and I got exactly the > same stacktrace. > > Also I was curious about how Kafka consumer's pause/resume API works, > so I've gone further and done a simple experiment to see if it helps > in this situation to retain consumer connection. > > The experimentation code in my remote branch here. > https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a > 29662ba9bf > > It seems working as expected. After back-pressure is engaged, > ConsumeKafka is not triggered for a while, then when downstream > recovers from back-pressure, ConsumeKafka resumes consuming messages > without any loss or warning messages using the same consumer instance. > > Nick, have you already start working on fixing this? > If you haven't, and the approach using pause/resume sounds reasonable, > I am going to add more synchronization, start/stop processor handling, > and make it work more properly with onTrigger so that it only try to > retain when there hasn't been any polling activity for some period of > time. > > How do you think? > Any feedback would be appreciated. Thanks! > > Koji > > On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza > <nick.care...@thecontrolgroup.com> wrote: > > Thank you guys, I will look to see what I can do to contribute. > > > > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <joe.w...@gmail.com> wrote: > >> > >> That said I think we can improve our handling of the consumer (kafka > >> client) and session (nifi transactional logic) and solve the problem. > >> It is related to our backpressure/consumer handling so we can fix > >> that. > >> > >> Thanks > >> Joe > >> > >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bbe...@gmail.com> wrote: > >> > No data loss, but you may process the same message twice in NiFi. > >> > > >> > The ordering of operations is: > >> > > >> > 1) poll Kafka > >> > 2) write received data to flow file > >> > 3) commit NiFi session so data in flow file cannot be lost > >> > 4) commit offsets to Kafka > >> > > >> > Doing it this way achieves at-least once processing which means you > >> > can't ever lose data, but you can process data twice. > >> > > >> > If we committed the offsets before committing the flow file you would > >> > never get duplicates, but you could lose a message if a crash happened > >> > between commit the offset and committing the NiFi session (at-most > >> > once processing). > >> > > >> > So the error is happening on #4 and NiFi has already produced a flow > >> > file with the message, but then Kafka says it can't update the offset, > >> > and then another consumer will likely pull that same message again and > >> > produce another flow file with the same message. > >> > > >> > > >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza > >> > <nick.care...@thecontrolgroup.com> wrote: > >> >> That makes perfect sense. To be clear, is there any potential to lose > >> >> messages in this scenario? > >> >> > >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.w...@gmail.com> wrote: > >> >>> > >> >>> yeah this is probably a good case/cause for use of the pause concept > >> >>> in kafka consumers. > >> >>> > >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbe...@gmail.com> > wrote: > >> >>> > I believe you are running into this issue: > >> >>> > > >> >>> > https://issues.apache.org/jira/browse/NIFI-3189 > >> >>> > > >> >>> > When back-pressure happens on the queue coming out of > ConsumeKafka, > >> >>> > this can last for longer than session.timeout.ms, and when the > >> >>> > processors resumes executing it receives this error on the first > >> >>> > execution. We should be able to implement some type of keep-alive > so > >> >>> > that even when the processor is not executing, there is a > background > >> >>> > thread, or some way of keeping the connections alive. > >> >>> > > >> >>> > I believe any user-defined properties in the processor get passed > to > >> >>> > the Kafka consumer, so I believe you could add " > session.timeout.ms" > >> >>> > and set a much higher value as a possible work around. > >> >>> > > >> >>> > Thanks, > >> >>> > > >> >>> > Bryan > >> >>> > > >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura > >> >>> > <ijokaruma...@gmail.com> > >> >>> > wrote: > >> >>> >> Hello Nick, > >> >>> >> > >> >>> >> First, I assume "had a queue back up" means have a queue being > >> >>> >> back-pressure. Sorry if that was different meaning. > >> >>> >> > >> >>> >> I was trying to reproduce by following flow: > >> >>> >> ConsumeKafka_0_10 > >> >>> >> -- success: Back Pressure Object Threshold = 10 > >> >>> >> -- UpdateAttribute (Stopped) > >> >>> >> > >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages. > >> >>> >> The result was, when NiFi received 10th messages, the success > >> >>> >> relationship back-pressure was enabled. > >> >>> >> When I published the 11th message, NiFi didn't do anything. > >> >>> >> This is expected behavior because downstream connection is > >> >>> >> back-pressured, the processor won't be scheduled. > >> >>> >> > >> >>> >> After I started UpdateAttribute and the queued flow files went > >> >>> >> through, ConsumeKafka was executed again and received the 11th > >> >>> >> message. > >> >>> >> > >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source > >> >>> >> code, > >> >>> >> those warning and error message is logged because NiFi received > >> >>> >> KafkaException when it tried to commit offset to Kafka. > >> >>> >> > >> >>> >> Were there anything in Kafka server logs? I suspect something had > >> >>> >> happened at Kafka server side. > >> >>> >> > >> >>> >> Thanks, > >> >>> >> Koji > >> >>> >> > >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza > >> >>> >> <nick.care...@thecontrolgroup.com> wrote: > >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally > >> >>> >>> operates > >> >>> >>> without > >> >>> >>> problems. I had a queue back up due to a downstream processor > and > >> >>> >>> I > >> >>> >>> started > >> >>> >>> getting these bulletins. > >> >>> >>> > >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] > >> >>> >>> Duplicates > >> >>> >>> are > >> >>> >>> likely as we were able to commit the process session but > received > >> >>> >>> an > >> >>> >>> exception from Kafka while committing offsets. > >> >>> >>> > >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] > >> >>> >>> Exception > >> >>> >>> while > >> >>> >>> interacting with Kafka so will close the lease > >> >>> >>> > >> >>> >>> > >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$ > SimpleConsumerLease@87d2ac1 > >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: > >> >>> >>> Commit > >> >>> >>> cannot be completed since the group has already rebalanced and > >> >>> >>> assigned the > >> >>> >>> partitions to another member. This means that the time between > >> >>> >>> subsequent > >> >>> >>> calls to poll() was longer than the configured > session.timeout.ms, > >> >>> >>> which > >> >>> >>> typically implies that the poll loop is spending too much time > >> >>> >>> message > >> >>> >>> processing. You can address this either by increasing the > session > >> >>> >>> timeout or > >> >>> >>> by reducing the maximum size of batches returned in poll() with > >> >>> >>> max.poll.records. > >> >>> >>> > >> >>> >>> My max.poll.records is set to 10000 on my consumer and > >> >>> >>> session.timeout.ms is > >> >>> >>> the default 10000 on the server. > >> >>> >>> > >> >>> >>> Since there is no such thing as coincidences, I believe this has > >> >>> >>> to do > >> >>> >>> with > >> >>> >>> it not being able to push received messages to the downstream > >> >>> >>> queue. > >> >>> >>> > >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not > to > >> >>> >>> throw > >> >>> >>> errors but continue to heartbeat with the Kafka server and > resume > >> >>> >>> consuming > >> >>> >>> once it can commit to the downstream queue? > >> >>> >>> > >> >>> >>> Might I have the server or consumer misconfigured to handle this > >> >>> >>> scenario or > >> >>> >>> should the consumer not be throwing this error? > >> >>> >>> > >> >>> >>> Thanks, > >> >>> >>> - Nick > >> >> > >> >> > > > > >