Hi, Andrew,

Thanks for the reply.

10. The impact from doing server side filtering is that we lose zero-copy
transfer, which provides two potential benefits: (1) more direct/efficient
transfer from disk to network; (2) less memory usage in heap since we don't
need to copy the records to heap to send a fetch response. (1) may already
be lost if SSL is used. However, it would be useful to outline the impact
of (2). For example, do we need to use memory_records in fetch responses?
How much additional heap memory will the server use? Do we need to cache
records in heap? If so, is the cache bounded?

12. If the group is configured with share and a client tries to join as a
consumer, what error do we return in the RPC and in the client API? Ditto
for the reverse case where a share client tries to join a group configured
with consumer.

17. "What is the client going to do with the exception?" Well, the reason
that we added this option was for the case where the same data could be
obtained from some other place. Suppose that we use CDC to get
database changes into a Kafka topic. If the consumer is slow
and unconsumed data is deleted in Kafka because of retention, by receiving
an exception, the consumer will know that there is potential missing data
in Kafka and could bootstrap all data from the database first before
resuming the consumption from Kafka.

18.2 So, to serve a ShareAcknowledgeRequest, the leader needs to write
share records for the acknowledgement and wait for the records to be fully
replicated before sending a response? It would be useful to document that
in the section of handling ShareAcknowledgeRequest.
18.3 Could we document when the leader writes SHARE_CHECKPOINT vs
SHARE_DELTA?

19. Could we document the difference in delivery guarantee between
share and regular consumer?

21. Do we lose zero-copy transfer for all consumers because of that (since
we don't know which topics contain control records?

24. Could we document the index layout?

25. "The callback tells you whether the acknowledgements for the entire
topic-partition succeeded or failed, rather than each record individually."
The issue is that with implicit acknowledgement, it's not clear which
records are in the batch. For example, if the client just keeps calling
poll() with no explicit commits, when a callback is received, does the
client know which records are covered by the callback?

30. ListGroupsOptions
30.1 public ListGroupsOptions types(Set<GroupType> states);
  Should states be types?
30.2 ListConsumerGroupsOptions supports filtering by state. Should we
support it here too?

31. ConsumerGroupListing includes state. Should we include state in
GroupListing to be consistent?

Jun

On Mon, Feb 12, 2024 at 3:55 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi Jun
> Thanks for your comments.
>
> 10. For read-uncommitted isolation level, the consumer just reads all
> records.
> For read-committed isolation level, the share-partition leader does the
> filtering to
> enable correct skipping of aborted records. The consumers in a share group
> are not
> aware of the filtering, unlike consumers in consumer groups.
>
> 11. The “classic” type is the pre-KIP 848 consumer groups.
>
> 12. By setting the configuration for a group resource, you are saying
> “when a new group is
> created with this name, it must have this type”. It’s not changing the
> type of an existing
> group.
>
> 13. Good catch. The Server Assignor should be at group level. I will
> change it.
>
> 14. That is true. I have maintained it to keep similarity with consumer
> groups,
> but it is not currently exposed to clients. It might be best to remove it.
>
> 15. I had intended that SimpleAssignor implements
> org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.
> Actually, I think there’s benefit to using a new interface so that someone
> doesn’t inadvertently
> configure something like the RoundRobinAssignor for a share group. It
> wouldn’t go well. I will
> add a new interface to the KIP.
>
> 16. When an existing member issues a ShareGroupHeartbeatRequest to the new
> coordinator,
> the coordinator returns UNKNOWN_MEMBER_ID. The client then sends another
> ShareGroupHeartbeatRequest
> containing no member ID and epoch 0. The coordinator then returns the
> member ID.
>
> 17. I don’t think so. What is the client going to do with the exception?
> Share groups are
> intentionally removing some of the details of using Kafka offsets from the
> consumers. If the
> SPSO needs to be reset due to retention, it just does that automatically.
>
> 18. The proposed use of control records needs some careful thought.
> 18.1. They’re written by the share-partition leader, not the coordinator.
> 18.2. If the client commits the acknowledgement, it is only confirmed to
> the client
> once it has been replicated to the other replica brokers. So, committing
> an acknowledgement
> is very similar to sending a record to a topic in terms of the behaviour.
>
> 19. You are correct. The possibility of record duplication exists in
> failure scenarios. A future KIP
> will add EOS support for share groups.
>
> 20.1. Yes, an exception. I was thinking InvalidOffsetException. I will
> update the KIP with more
> detail about protocol error codes and API exceptions.
> 20.2. I think that’s a mistake. I’ll rectify it.
>
> 21. The message sets for the new control records would be filtered out for
> all consumers.
>
> 22. Fetch from follower is not supported. I will update the KIP.
>
> 23.1. I am not quite happy with the explanation of the checkpoint and
> delta records. Essentially,
> there needs to be one checkpoint and then any number of deltas. Then
> another checkpoint supersedes
> the previous records, and can have its own sequence of deltas, and so on.
> Because recovery requires the
> leader to read the latest checkpoint and all subsequent deltas, you want
> to take checkpoints frequently
> enough to speed up recovery, but infrequently enough to minimise the
> performance impact of reserializing
> all the state.
> 23.2. I’ll check the document again carefully, but the SHARE_DELTA should
> always contain DeliveryCount
> for every member of the States array.
>
> 24. I was anticipating added to the index files which are part of each log
> segment.
>
> 25. The acknowledgements for each topic-partition are atomic. All this
> really means is that we perform the
> state checking and the state persistence atomically (one control record).
> The callback tells you whether the
> acknowledgements for the entire topic-partition succeeded or failed,
> rather than each record individually.
> I could have gone with a callback with a record-based interface. Would
> that be preferable, do you think?
> For one thing, that does give more flexibility for optimisations such as
> fetch pipelining in the future.
>
> 26. The metadata is unused. This is re-using an existing class
> (OffsetAndMetadata). Perhaps it would be better
> not to.
>
> 27. Yes, agreed. I will add it.
>
> Thanks,
> Andrew
>
> > On 9 Feb 2024, at 23:14, Jun Rao <j...@confluent.io.INVALID> wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 10. ShareFetchResponse: To consume transactional data, currently
> > FetchResponse includes the AbortedTransactions fields for the client to
> > properly skip aborted records. ShareFetchResponse doesn't include that.
> How
> > do we prevent the consumer from reading aborted records in a share group?
> >
> > 11. "adding "share"  to the existing group types of "consumer"  and
> > "classic" "
> > What's the "classic" type?
> >
> > 12. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name
> > group --entity-name G1 --alter --add-config group.type=share
> > So, one could change the group type? What happens to the states
> associated
> > with the group (members, epoch, offsets, etc)?
> >
> > 13. Why is Server Assignor at member level, instead of group level?
> >
> > 14. Member.metadata: How is that being used? It isn't exposed to the
> client.
> >
> > 15. What public interface does SimpleAssignor implement?
> >
> > 16. "This means that existing members will have to rejoin the share group
> > following a coordinator failover."
> > When an existing member issues a ShareGroupHeartbeatRequest to the new
> > coordinator, does the coordinator return UNKNOWN_MEMBER_ID and a new
> > memberId?
> >
> > 17. auto.offset.reset has the option to throw an exception to the client
> if
> > the current offset does not exist any more on the server (e.g. due to
> > retention). Should group.share.auto.offset.reset support  that too?
> >
> > 18. SHARE_CHECKPOINT and SHARE_DELTA records:
> > 18.1 When does the coordinator write them?
> > 18.2 If the client commits the acknowledgement successfully, could
> > the acknowledgement be lost on the broker if the coordinator fails over?
> >
> > 19. In the current consumer model, coordinator failover doesn't cause
> > duplicate records in consumers. In the share group model, I guess this is
> > no longer true since we are not persisting the acquired state?
> >
> > 20. "The calls to KafkaShareConsumer.acknowledge(ConsumerRecord,
> > AcknowledgeType) must be issued in the order in which the records appear
> in
> > the ConsumerRecords object, which will be in order of increasing offset
> for
> > each share-partition."
> > 20.1  What happens if the acknowledge() call doesn't follow this? Does
> > the caller get an exception?
> > 20.2 The example with Acknowledge 119. It seems the acknowledgement is
> out
> > of order since records at offset 111-118 haven't been acknowledged?
> >
> > 21. "Indeed, these message sets are not returned to consumer". Are we
> > excluding those control records for non-shared consumers too?
> >
> > 22. The design doesn't seem to support fetching from the followers. This
> > might be ok, but it will be useful to explicitly mention this.
> >
> > 23. Examples with control records for SHARE_DELTA:
> > 23.1 Some of the state changes contain cumulative state instead of delta.
> > For example, "record 110 (available, delivery count 1), records 111-118
> > acquired, record 119 acknowledged" for "Acknowledge 119".
> > 23.2 SHARE_DELTA sometimes include available records with DeliveryCount
> of
> > 0. But we don't do that for every record. What's the convention?
> >    {
> >      "BaseOffset": 111,
> >      "LastOffset": 118,
> >      "State": 0 (Available),
> >      "DeliveryCount": 0
> >    }
> >
> > 24. "when a broker becomes the leader of a share-partition, it must read
> > the most recent SHARE_CHECKPOINT": How does a broker find this
> efficiently
> > on restart?
> >
> > 25. AcknowledgeCommitCallback: How would an application use it? It
> doesn't
> > indicate which record's acknowledgement has failed.
> >
> > 26. AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
> > Map<TopicPartition, OffsetAndMetadata> offsets): How is the metadata
> used?
> > It doesn't seem there is an API to use it in either the client
> application
> > or the broker.
> >
> > 27. It would be useful to add a section on downgradability since the KIP
> > changes the record format in the internal offset topic.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Oct 11, 2023 at 8:25 AM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi Jack,
> >> Thanks for your comments.
> >>
> >> I have added a new section on Log Retention which describes the
> behaviour
> >> of the SPSO as the LSO advances. That makes total sense
> >> and was an omission from the KIP.
> >>
> >> I have added the other ideas as potential future work. I do like the
> idea
> >> of having the SPSO influence the advancements of the LSO
> >> for topics which are primarily being using with share groups.
> >>
> >> I have published an updated version of the KIP.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 4 Oct 2023, at 10:09, Jack Vanlightly <vanligh...@apache.org>
> wrote:
> >>>
> >>> I would like to see more explicit discussion of topic retention and
> >> share groups. There are a few options here from simple to more
> >> sophisticated. There are also topic-level and share-group level options.
> >>>
> >>> The simple thing would be to ensure that the SPSO of each share group
> is
> >> bounded by the Log Start Offset (LSO) of each partition which itself is
> >> managed by the retention policy. This is a topic-level control which
> >> applies to all share-groups. I would say that this shared retention is
> the
> >> largest drawback of modeling queues on shared logs and this is worth
> noting.
> >>>
> >>> More sophisticated approaches can be to allow the LSO to advance not
> >> (only) by retention policy but by the advancement of the lowest SPSO.
> This
> >> can keep the amount of data lower by garbage collecting messages that
> have
> >> been acknowledged by all share groups. Some people may like that
> behaviour
> >> on those topics where share groups are the only consumption model and no
> >> replay is needed.
> >>>
> >>> There are per-share-group possibilities such as share-group TTLs where
> >> messages can be archived on a per share group basis.
> >>>
> >>> Thanks
> >>> Jack
> >>
> >>
>
>

Reply via email to