Hey Vahid,

On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to store
> that custom retention (and use it once the group becomes Empty). One place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients don't
> overwrite the broker's retention), unless we somehow want to support this
> retention on a per-group basis. What do you think?


Here's what I was thinking. The current offset commit schema looks like
this:

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long
  ExpireTimestmap => Long

If we have any clients that ask for an explicit retention timeout, then we
can continue using this schema and providing the current behavior. The
offsets will be retained until they are individually expired.

For newer clients or those that request the default retention, we can bump
the schema and remove ExpireTimestamp.

OffsetCommit =>
  Offset => Long
  Metadata => String
  CommitTimestamp => Long

We also need to bump the version of the group metadata schema to include
the timestamp of the state change. There are two cases: standalone "simple"
consumers and consumer groups.

1) For standalone consumers, we'll expire based on the commit timestamp of
the offset message. Internally, the group will be Empty and have no
transition timestamp, so the expiration criteria is when (now - commit
timestamp) is greater than the configured retention time.

2) For consumer groups, we'll expire based on the timestamp that the group
transitioned to Empty.

This way, changing the retention time config will affect all existing
groups except those from older clients that are requesting an explicit
retention time. Would that work?

On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?


Yes, I think that can work.  So we would expire offsets for a consumer
group individually if they have reached the retention time and the group is
not empty, but is no longer subscribed to them. Is that right?


Thanks,
Jason




On Fri, Mar 2, 2018 at 3:36 PM, Vahid S Hashemian <vahidhashem...@us.ibm.com
> wrote:

> Hi Jason,
>
> I'm thinking through some of the details of the KIP with respect to your
> feedback and the decision to keep the expire-timestamp for each group
> partition in the offset message.
>
> On point #1 below: since the expiration timer starts ticking after the
> group becomes Empty the expire_timestamp of group offsets will be set when
> that transition occurs. In normal cases that expire_timestamp is
> calculated as "current timestamp" + "broker's offset retention". Then if
> an old client provides a custom retention, we probably need a way to store
> that custom retention (and use it once the group becomes Empty). One place
> to store it is in group metadata message, but the issue is we would be
> introducing a new field only for backward compatibility (new clients don't
> overwrite the broker's retention), unless we somehow want to support this
> retention on a per-group basis. What do you think?
>
> On point #3: as you mentioned, currently there is no "notification"
> mechanism for GroupMetadataManager in place when a subscription change
> occurs. The member subscription however is available in the group metadata
> and a poll approach could be used to check group subscriptions on a
> regular basis and expire stale offsets (if there are topics the group no
> longer is subscribed to). This can be done as part of the offset cleanup
> scheduled task that by default does not run very frequently. Were you
> thinking of a different method for capturing the subscription change?
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Jason Gustafson <ja...@confluent.io>
> To:     dev@kafka.apache.org
> Date:   02/18/2018 01:16 PM
> Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> Consumer Group Offsets
>
>
>
> Hey Vahid,
>
> Sorry for the late response. The KIP looks good. A few comments:
>
> 1. I'm not quite sure I understand how you are handling old clients. It
> sounds like you are saying that old clients need to change configuration?
> I'd suggest 1) if an old client requests the default expiration, then we
> use the updated behavior, and 2) if the old client requests a specific
> expiration, we enforce it from the time the group becomes Empty.
>
> 2. Does this require a new version of the group metadata messsage format?
> I
> think we need to add a new field to indicate the time that the group state
> changed to Empty. This will allow us to resume the expiration timer
> correctly after a coordinator change. Alternatively, we could reset the
> expiration timeout after every coordinator move, but it would be nice to
> have a definite bound on offset expiration.
>
> 3. The question about removal of offsets for partitions which are no
> longer
> in use is interesting. At the moment, it's difficult for the coordinator
> to
> know that a partition is no longer being fetched because it is agnostic to
> subscription state (the group coordinator is used for more than just
> consumer groups). Even if we allow the coordinator to read subscription
> state to tell which topics are no longer being consumed, we might need
> some
> additional bookkeeping to keep track of /when/ the consumer stopped
> subscribing to a particular topic. Or maybe we can reset this expiration
> timer after every coordinator change when the new coordinator reads the
> offsets and group metadata? I am not sure how common this use case is and
> whether it needs to be solved as part of this KIP.
>
> Thanks,
> Jason
>
>
>
> On Thu, Feb 1, 2018 at 12:40 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Thanks James for sharing that scenario.
> >
> > I agree it makes sense to be able to remove offsets for the topics that
> > are no longer "active" in the group.
> > I think it becomes important to determine what constitutes that a topic
> is
> > no longer active: If we use per-partition expiration we would manually
> > choose a retention time that works for the particular scenario.
> >
> > That works, but since we are manually intervening and specify a
> > per-partition retention, why not do the intervention in some other way:
> >
> > One alternative for this intervention, to favor the simplicity of the
> > suggested protocol in the KIP, is to improve upon the just introduced
> > DELETE_GROUPS API and allow for deletion of offsets of specific topics
> in
> > the group. This is what the old ZooKeeper based group management
> supported
> > anyway, and we would just be leveling the group deletion features of the
> > Kafka-based group management with the ZooKeeper-based one.
> >
> > So, instead of deciding in advance when the offsets should be removed we
> > would instantly remove them when we are sure that they are no longer
> > needed.
> >
> > Let me know what you think.
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   James Cheng <wushuja...@gmail.com>
> > To:     dev@kafka.apache.org
> > Date:   02/01/2018 12:37 AM
> > Subject:        Re: [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer Group Offsets
> >
> >
> >
> > Vahid,
> >
> > Under rejected alternatives, we had decided that we did NOT want to do
> > per-partition expiration, and instead we wait until the entire group is
> > empty and then (after the right time has passed) expire the entire group
> > at once.
> >
> > I thought of one scenario that might benefit from per-partition
> > expiration.
> >
> > Let's say I have topics A B C... Z. So, I have 26 topics, all of them
> > single partition, so 26 partitions. Let's say I have mirrormaker
> mirroring
> > those 26 topics. The group will then have 26 committed offsets.
> >
> > Let's say I then change the whitelist on mirrormaker so that it only
> > mirrors topic Z, but I keep the same consumer group name. (I imagine
> that
> > is a common thing to do?)
> >
> > With the proposed design for this KIP, the committed offsets for topics
> A
> > through Y will stay around as long as this mirroring group name exists.
> >
> > In the current implementation that already exists (prior to this KIP), I
> > belive that committed offsets for topics A through Y will expire.
> >
> > How much do we care about this case?
> >
> > -James
> >
> > > On Jan 23, 2018, at 11:44 PM, Jeff Widman <j...@jeffwidman.com> wrote:
> > >
> > > Bumping this as I'd like to see it land...
> > >
> > > It's one of the "features" that tends to catch Kafka n00bs unawares
> and
> > > typically results in message skippage/loss, vs the proposed solution
> is
> > > much more intuitive behavior.
> > >
> > > Plus it's more wire efficient because consumers no longer need to
> commit
> > > offsets for partitions that have no new messages just to keep those
> > offsets
> > > alive.
> > >
> > > On Fri, Jan 12, 2018 at 10:21 AM, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com> wrote:
> > >
> > >> There has been no further discussion on this KIP for about two
> months.
> > >> So I thought I'd provide the scoop hoping it would spark additional
> > >> feedback and move the KIP forward.
> > >>
> > >> The KIP proposes a method to preserve group offsets as long as the
> > group
> > >> is not in Empty state (even when offsets are committed very rarely),
> > and
> > >> start the offset expiration of the group as soon as the group becomes
> > >> Empty.
> > >> It suggests dropping the `retention_time` field from the
> `OffsetCommit`
> > >> request and, instead, enforcing it via the broker config
> > >> `offsets.retention.minutes` for all groups. In other words, all
> groups
> > >> will have the same retention time.
> > >> The KIP presumes that this global retention config would suffice
> common
> > >> use cases and does not lead to, e.g., unmanageable offset cache size
> > (for
> > >> groups that don't need to stay around that long). It suggests opening
> > >> another KIP if this global retention setting proves to be problematic
> > in
> > >> the future. It was suggested earlier in the discussion thread that
> the
> > KIP
> > >> should propose a per-group retention config to circumvent this risk.
> > >>
> > >> I look forward to hearing your thoughts. Thanks!
> > >>
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >> From:   "Vahid S Hashemian" <vahidhashem...@us.ibm.com>
> > >> To:     dev <dev@kafka.apache.org>
> > >> Date:   10/18/2017 04:45 PM
> > >> Subject:        [DISCUSS] KIP-211: Revise Expiration Semantics of
> > Consumer
> > >> Group Offsets
> > >>
> > >>
> > >>
> > >> Hi all,
> > >>
> > >> I created a KIP to address the group offset expiration issue reported
> > in
> > >> KAFKA-4682:
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> > >> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> > >> d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> > >> kjJc7uSVcviKUc&m=JkzH_2jfSMhCUPMk3rUasrjDAId6xbAEmX7_shSYdU4&s=
> > >> UBu7D2Obulg0fterYxL5m8xrDWkF_O2kGlygTCWsfFc&e=
> > >>
> > >>
> > >> Your feedback is welcome!
> > >>
> > >> Thanks.
> > >> --Vahid
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > > --
> > >
> > > *Jeff Widman*
> > > jeffwidman.com <
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.
> > jeffwidman.com_&d=DwIFAg&c=jf_iaSHvJObTbx-siA1ZOg&r=Q_
> > itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc&m=TpGQEOVxEJ-
> >
> 3Y4rXTBP5CW7iUmO3PYKt_WeEDomWkAs&s=WXmFA_R-gtKbl9KgDfhHB4flnaBUB5C0ypRy4n
> > 9xkoI&e=
> > > | 740-WIDMAN-J (943-6265)
> > > <><
> >
> >
> >
> >
> >
> >
>
>
>
>
>

Reply via email to