Thanks, Jiangjie. Understanding more about the auto-commit behavior and why it's resilient to these is a big help.
We're going to do some deeper investigation and testing. I'll report back when I have more information. Thanks, Cliff On Thu, Oct 22, 2015 at 11:48 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Cliff, > > If auto.offset.commit is set to true, the offset will be committed in > following cases in addition to periodical offset commit: > > 1. During consumer rebalance before release the partition ownership. > If consumer A owns partition P before rebalance, it will commit offset for > partition P during rebalance. If consumer B become the new owner of > partition P after rebalance, it will start from the committed offset, so > there will be no duplicate messages. > 2. When consumer closes. > > Rebalance will be triggered in the following cases: > 1. A consumer joins/leaves the group. > 2. Some topic/partition changes occurred to the interested topics.(e.g. > partition expansion for a topic; a new topic created and the consumer is > using a wildcard that matches the new topic name) > > To answer your question: > Simple consumer should not interfere with high level consumer because it > does not have any group management embedded. > > Typically a single high level consumer group will not rebalance unless > there is topic/partition change. However, it is possible the consumer > itself dropped out of the group and rejoins. This typically happens when > you have a ZK session timeout. In that case, you should see "ZK expired" in > your log. You can search for that and see if that is the problem. > > Jiangjie (Becket) Qin > > > On Thu, Oct 22, 2015 at 1:14 PM, Cliff Rhyne <crh...@signal.co> wrote: > > > We did some more testing with logging turned on (I figured out why it > > wasn't working). We tried increasing the JVM memory capacity on our test > > server (it's lower than in production) and increasing the zookeeper > > timeouts. Neither changed the results. With trace logging enabled, we > saw > > that we were getting rebalances even though there is only one high level > > consumer running (there previously was a simple consumer that was told to > > disconnect, but that consumer only checked the offsets and never consumed > > data). > > > > - Is there possibly a race condition where the simple consumer has a hold > > on a partition and shutdown is called before starting a high level > consumer > > but shutdown is done asynchronously? > > - What are the various things that can cause a consumer rebalance other > > than adding / removing high level consumers? > > > > Thanks, > > Cliff > > > > On Wed, Oct 21, 2015 at 4:20 PM, Cliff Rhyne <crh...@signal.co> wrote: > > > > > Hi Kris, > > > > > > Thanks for the tip. I'm going to investigate this further. I checked > > and > > > we have fairly short zk timeouts and run with a smaller memory > allocation > > > on the two environments we encounter this issue. I'll let you all know > > > what I find. > > > > > > I saw this ticket https://issues.apache.org/jira/browse/KAFKA-2049 > that > > > seems to be related to the problem (but would only inform that an issue > > > occurred). Are there any other open issues that could be worked on to > > > improve Kafka's handling of this situation? > > > > > > Thanks, > > > Cliff > > > > > > On Wed, Oct 21, 2015 at 2:53 PM, Kris K <squareksc...@gmail.com> > wrote: > > > > > >> Hi Cliff, > > >> > > >> One other case I observed in my environment is - when there were gc > > pauses > > >> on one of our high level consumer in the group. > > >> > > >> Thanks, > > >> Kris > > >> > > >> On Wed, Oct 21, 2015 at 10:12 AM, Cliff Rhyne <crh...@signal.co> > wrote: > > >> > > >> > Hi James, > > >> > > > >> > There are two scenarios we run: > > >> > > > >> > 1. Multiple partitions with one consumer per partition. This rarely > > has > > >> > starting/stopping of consumers, so the pool is very static. There > is > > a > > >> > configured consumer timeout, which is causing the > > >> ConsumerTimeoutException > > >> > to get thrown prior to the test starting. We handle this exception > > and > > >> > then resume consuming. > > >> > 2. Single partition with one consumer. This consumer is started by > a > > >> > triggered condition (number of messages pending to be processed in > the > > >> > kafka topic or a schedule). The consumer is stopped after > processing > > is > > >> > completed. > > >> > > > >> > In both cases, based on my understanding there shouldn't be a > > rebalance > > >> as > > >> > either a) all consumers are running or b) there's only one consumer > / > > >> > partition. Also, the same consumer group is used by all consumers > in > > >> > scenario 1 and 2. Is there a good way to investigate whether > > rebalances > > >> > are occurring? > > >> > > > >> > Thanks, > > >> > Cliff > > >> > > > >> > On Wed, Oct 21, 2015 at 11:37 AM, James Cheng <jch...@tivo.com> > > wrote: > > >> > > > >> > > Do you have multiple consumers in a consumer group? > > >> > > > > >> > > I think that when a new consumer joins the consumer group, that > the > > >> > > existing consumers will stop consuming during the group rebalance, > > and > > >> > then > > >> > > when they start consuming again, that they will consume from the > > last > > >> > > committed offset. > > >> > > > > >> > > You should get more verification on this, tho. I might be > > remembering > > >> > > wrong. > > >> > > > > >> > > -James > > >> > > > > >> > > > On Oct 21, 2015, at 8:40 AM, Cliff Rhyne <crh...@signal.co> > > wrote: > > >> > > > > > >> > > > Hi, > > >> > > > > > >> > > > My team and I are looking into a problem where the Java high > level > > >> > > consumer > > >> > > > provides duplicate messages if we turn auto commit off (using > > >> version > > >> > > > 0.8.2.1 of the server and Java client). The expected sequence > of > > >> > events > > >> > > > are: > > >> > > > > > >> > > > 1. Start high-level consumer and initialize a KafkaStream to > get a > > >> > > > ConsumerIterator > > >> > > > 2. Consume n items (could be 10,000, could be 1,000,000) from > the > > >> > > iterator > > >> > > > 3. Commit the new offsets > > >> > > > > > >> > > > What we are seeing is that during step 2, some number of the n > > >> messages > > >> > > are > > >> > > > getting returned by the iterator in duplicate (in some cases, > > we've > > >> > seen > > >> > > > n*5 messages consumed). The problem appears to go away if we > turn > > >> on > > >> > > auto > > >> > > > commit (and committing offsets to kafka helped too), but auto > > commit > > >> > > causes > > >> > > > conflicts with our offset rollback logic. The issue seems to > > happen > > >> > more > > >> > > > when we are in our test environment on a lower-cost cloud > > provider. > > >> > > > > > >> > > > Diving into the Java and Scala classes including the > > >> ConsumerIterator, > > >> > > it's > > >> > > > not obvious what event causes a duplicate offset to be requested > > or > > >> > > > returned (there's even a loop that is supposed to exclude > > duplicate > > >> > > > messages in this class). I tried turning on trace logging but > my > > >> log4j > > >> > > > config isn't getting the Kafka client logs to write out. > > >> > > > > > >> > > > Does anyone have suggestions of where to look or how to enable > > >> logging? > > >> > > > > > >> > > > Thanks, > > >> > > > Cliff > > >> > > > > >> > > > > >> > > ________________________________ > > >> > > > > >> > > This email and any attachments may contain confidential and > > privileged > > >> > > material for the sole use of the intended recipient. Any review, > > >> copying, > > >> > > or distribution of this email (or any attachments) by others is > > >> > prohibited. > > >> > > If you are not the intended recipient, please contact the sender > > >> > > immediately and permanently delete this email and any attachments. > > No > > >> > > employee or agent of TiVo Inc. is authorized to conclude any > binding > > >> > > agreement on behalf of TiVo Inc. by email. Binding agreements with > > >> TiVo > > >> > > Inc. may only be made by a signed written agreement. > > >> > > > > >> > > > >> > > > > > >