Hey Dong,
I see what you mean. This would have been clearer if the committed offset
was the offset of the last consumed record. I don't feel too strongly about
it either. Perhaps we can use the more concise name and just rely on the
documentation to explain its usage. It should be rare that users
Hey Jason,
Thanks for the reply. Regarding 3), I am thinking that both "Offset" and
"LastLeaderEpoch" in the OffsetCommitRequest are associated with the last
consumed messages. Value of "Offset" is not necessarily the offset of the
next message due to log compaction. Since we are naming "Offset"
Hi Dong,
Thanks for the comments.
1) Yes, makes sense.
2) This is an interesting point. The suggestion made more sense in the
initial version of the KIP, but I think you are right that we should use
the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch
APIs. Just like a
Hey Jun,
57. It's a fair point. I could go either way, but I'm slightly inclined to
just document the new API for now. We'll still support seeking to an offset
with corresponding epoch information, so deprecating the old seek() seems
like overkill.
60. The phrasing was a little confusing. Does
Hey Jason,
Thanks for the update. I have some comments below:
1) Since FencedLeaderEpochException indicates that the metadata in the
client is outdated, should it extend InvalidMetadataException?
2) It is mentioned that "To fix the problem with KIP-232, we will add the
leader epoch the
Hi, Jason,
Thanks for the reply. They all make sense. Just a couple of more minor
comments.
57. I was thinking that if will be useful to encourage people to use the
new seek() api to get better semantics. Deprecating the old seek api is one
way. I guess we could also just document it for now.
Hi Jun,
I spent a little more time looking at the usage in WorkerSinkTask. I think
actually the initialization of the positions in the assignment callback is
not strictly necessary. We keep a map of the current consumed offsets which
is updated as we consume the data. As far as I can tell, we
Hey Jun,
Thanks for the review. Responses below:
50. Yes, that is right. I clarified this in the KIP.
51. Yes, updated the KIP to mention.
52. Yeah, this was a reference to a previous iteration. I've fixed it.
53. I changed the API to use an `Optional` for the leader epoch
and added a note
Hi, Jason,
Thanks for the updated KIP. Well thought-through. Just a few minor comments
below.
50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
under the cover, it will make OffsetsForLeaderEpoch request to determine if
the seeked offset is still valid before fetching? If
Hey All,
I think I've addressed all pending review. If there is no additional
feedback, I'll plan to start a vote thread next week.
Thanks,
Jason
On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin wrote:
> Hey Jason,
>
> Thanks for your reply. I will comment below.
>
> Regarding 1, we probably can not
Hey Jason,
Thanks for your reply. I will comment below.
Regarding 1, we probably can not simply rename both to `LeaderEpoch`
because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
Regarding 5, I am not strong on this. I agree with the two benefits of
having two error codes: 1)
Hey Dong,
Thanks for the detailed review. Responses below:
1/2: Thanks for noticing the inconsistency. Would it be reasonable to
simply call it LeaderEpoch for both APIs?
3: I agree it should be a map. I will update.
4: Fair point. I think we should always be able to identify an offset.
Let's
Hey Jason,
Thanks for the update! I agree with the current proposal overall. I have
some minor comments related to naming etc.
1) I am not strong and will just leave it here for discussion. Would it be
better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new
field in the
Thanks Anna, you are right on both points. I updated the KIP.
-Jason
On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner wrote:
> Hi Jason,
>
> Thanks for the update. I agree with the current proposal.
>
> Two minor comments:
> 1) In “API Changes” section, first paragraph says that “users can catch
Hi Jason,
Thanks for the update. I agree with the current proposal.
Two minor comments:
1) In “API Changes” section, first paragraph says that “users can catch the
more specific exception type and use the new `seekToNearest()` API defined
below.”. Since LogTruncationException “will include the
Hi All,
I have made some updates to the KIP. As many of you know, a side project of
mine has been specifying the Kafka replication protocol in TLA. You can
check out the code here if you are interested:
https://github.com/hachikuji/kafka-specification. In addition to uncovering
a couple unknown
Hi Jason,
The proposed API seems reasonable to me too. Could you please also update
the wiki page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
with a section say "workflow" on how the proposed API will be co-used with
others
Hi Jason,
I also like your proposal and agree that KafkaConsumer#seekToCommitted() is
more intuitive as a way to initialize both consumer's position and its
fetch state.
My understanding that KafkaConsumer#seekToCommitted() is purely for clients
who store their offsets externally, right? And
Hey Jason,
It is a great summary. The solution sounds good. I might have minor
comments regarding the method name. But we can discuss that minor points
later after we reach consensus on the high level API.
Thanks,
Dong
On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson
wrote:
> Hey Anna and
Hey Anna and Dong,
Thanks a lot for the great discussion. I've been hanging back a bit because
honestly the best option hasn't seemed clear. I agree with Anna's general
observation that there is a distinction between the position of the
consumer and its fetch state up to that position. If you
Hey Anna,
Thanks much for the explanation. Approach 1 also sounds good to me. I think
findOffsets() is useful for users who don't use automatic offset reset
policy.
Just one more question. Since users who store offsets externally need to
provide leaderEpoch to findOffsets(...), do we need an
Hi Dong,
What I called “not covering all use cases” is what you call best-effort
(not guaranteeing some corner cases). I think we are on the same page here.
I wanted to be clear in the API whether the consumer seeks to a position
(offset) or to a record (offset, leader epoch). The only
Hey Anna,
Thanks for the comment. To answer your question, it seems that we can cover
all case in this KIP. As stated in "Consumer Handling" section, KIP-101
based approach will be used to derive the truncation offset from the
2-tuple (offset, leaderEpoch). This approach is best effort and it is
Sorry, I hit "send" before finishing. Continuing...
2) Hiding most of the consumer handling log truncation logic with minimal
exposure in KafkaConsumer API. I was proposing this path.
Before answering your specific questions… I want to answer to your comment
“In general, maybe we should
Hi Dong,
Thanks for the follow up! I finally have much more clear understanding of
where you are coming from.
You are right. The success of findOffsets()/finding a point of
non-divergence depends on whether we have enough entries in the consumer's
leader epoch cache. However, I think this is a
Hey Anna,
Thanks much for your detailed explanation and example! It does help me
understand the difference between our understanding.
So it seems that the solution based on findOffsets() currently focuses
mainly on the scenario that consumer has cached leaderEpoch -> offset
mapping whereas I was
Hi Dong,
Thanks for considering my suggestions.
Based on your comments, I realized that my suggestion was not complete with
regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
to keep KafkaConsumer#seek() unchanged and take offset only, the underlying
consumer will send
Hey Anna,
Thanks much for the thoughtful reply. It makes sense to different between
"seeking to a message" and "seeking to a position". I have to questions
here:
- For "seeking to a message" use-case, with the proposed approach user
needs to call findOffset(offset, leaderEpoch) followed by
Hi, Jason,
Thanks for the KIP. Looks good overall. Just a few minor comments below.
1. "As the consumer is fetching from a partition, it will keep a small
cache of the recent epochs that were fetched for each partition. " Do we
need to cache more than one leader epoch? Also, during consumer
Hi Jason and Dong,
I’ve been thinking about your suggestions and discussion regarding
position(), seek(), and new proposed API.
Here is my thought process why we should keep position() and seek() API
unchanged.
I think we should separate {offset, leader epoch} that uniquely identifies
a
Hey Jason,
Thanks much for your thoughtful explanation.
Yes the solution using findOffsets(offset, leaderEpoch) also works. The
advantage of this solution it adds only one API instead of two APIs. The
concern is that its usage seems a bit more clumsy for advanced users. More
specifically,
Hi Dong,
Thanks, I've been thinking about your suggestions a bit. It is challenging
to make this work given the current APIs. One of the difficulties is that
we don't have an API to find the leader epoch for a given offset at the
moment. So if the user does a seek to offset 5, then we'll need a
Hey Jason,
Regarding seek(...), it seems that we want an API for user to initialize
consumer with (offset, leaderEpoch) and that API should allow throwing
PartitionTruncationException. Suppose we agree on this, then
seekToNearest() is not sufficient because it will always swallow
Hey Dong,
Thanks for the feedback. The first three points are easy:
1. Yes, we should be consistent.
2. Yes, I will add this.
3. Yes, I think we should document the changes to the committed offset
schema. I meant to do this, but it slipped my mind.
The latter questions are tougher. One option I
Regarding points 4) and 5) above, motivation for the alternative APIs is
that, if we decide that leaderEpoch is equally important as offset in
identifying a message, then it may be reasonable to always specify it
wherever offset is currently required in the consumer API to identify a
message, e.g.
Hey Jason,
Thanks for the update. It looks pretty good. Just some minor comments below:
1) The KIP adds new error code "LOG_TRUNCATION" and new
exception TruncatedPartitionException. Can we make the name more
consistent, e.g. LogTruncationException?
2) Do we need to add
Sounds good to me.
On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson wrote:
> Hey Guozhang,
>
> That's fair. In fact, perhaps we do not need this API at all. We already
> have the new seek() in this KIP which can do the lookup based on epoch for
> this use case. I guess we should probably call
Hey Guozhang,
That's fair. In fact, perhaps we do not need this API at all. We already
have the new seek() in this KIP which can do the lookup based on epoch for
this use case. I guess we should probably call it seekToNearest() though to
make it clear that the final position may be different from
Hi Jason,
I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs,
since probably only very advanced users are aware of the leaderEpoch, and
hence ever care to use it anyways. It is more like an admin client
operation than a consumer client operation: if the motivation is to
Thanks for the feedback. I've updated the KIP. Specifically I removed the
"closest" reset option and the proposal to reset by timestamp when the
precise truncation point cannot be determined. Instead, I proposed that we
always reset using the nearest epoch when a reset policy is defined (either
@Dong
Those are fair points. Both approaches require some fuzziness to reset the
offset in these pathological scenarios and we cannot guarantee
at-least-once delivery either way unless we have the full history of leader
epochs that were consumed. The KIP-101 logic may actually be more accurate
Jason, thanks for the KIP. A few comments:
* I think Dong's question about whether to use timestamp-based approach
v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
timestamp-based approach we may still be reseting to an offset falling into
the truncated interval, and
Hey Jason,
Thanks for the explanation.
Please correct me if this is wrong. The "unknown truncation offset"
scenario happens when consumer does not have the full leaderEpoch -> offset
mapping. In this case we can still use the KIP-101-based approach to
truncate offset to "start offset of the
The other thing I forgot to mention is that resetting the offset using the
leader epoch is only available with the latest message format. By
supporting reset by timestamp, users on the v1 format can still get some
benefit from this KIP.
-Jason
On Tue, Jun 26, 2018 at 11:47 AM, Jason Gustafson
Hey Dong,
Thanks for the comments.
- The KIP says that, with auto.offset.reset="closest", timestamp is used to
> find offset if truncation offset is unknown. It seems that if consumer
> knows the timestamp of the last message, then the consumer should also know
> the (offset, leaderEpoch) of the
Hey Jason,
Thanks for the KIP! It is pretty useful.
At high level the new set of reset policies may be a bit complicated and
confusing to users. I am wondering whether we can simplify it. A few
questions below:
- The KIP says that, with auto.offset.reset="closest", timestamp is used to
find
Hey All,
I wrote up a KIP to handle one more edge case in the replication protocol
and to support better handling of truncation in the consumer when unclean
leader election is enabled. Let me know what you think.
47 matches
Mail list logo