[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864315#comment-17864315
 ] 

Tom Kalmijn edited comment on KAFKA-17093 at 7/9/24 5:48 PM:
-------------------------------------------------------------

Hi [~schofielaj], 

Great efforts. 

My service starts up in a 'loading' state, reading all messages from the start 
(earliest) of a (compacted) topic. After all current messages have been loaded 
it should transition into its 'ready' state but it must keep on listening for 
new messages.

In speudo code this is what I am trying to accomplish:
{quote} 
{code:java}
var lastOffset = determine offset off last message
while (true) {
  read record         
  if record.offset == lastOffset then
         transition from 'loading' into'ready'   state
}
{code}
 
{quote}
Now the challenge is: if lastOffset is more advanced then the offset of the 
last record (at start up) then it will never transition into 'ready' unless new 
messages come in. But new message cannot come in unless the service is in the 
'ready' state. Catch 22.

Also, I have a solution based on back tracking and polling, which works, 
however it takes about 500 millis per offset to get the last offset this way. 
This delay is slightly problematic for me since there are thousands of such 
loading processes in a fairly short time frame.

If you tell me this is the only way to go, so be it.

But it does beg the question: why does seekToEnd make a distinction between 
read_committed and read_uncommitted? read_committed consumers will skip the 
control records anyway? Unless seekToEnd has more meaning, actually telling me 
where the last message is.


was (Author: JIRAUSER306103):
Hi [~schofielaj], 

Great efforts. 

My service starts up in a 'loading' state, reading all messages from the start 
(earliest) of a (compacted) topic. After all current messages have been loaded 
it should transition into its 'ready' state but it must keep on listening for 
new messages.

In speudo code this is what I am trying to accomplish:
{quote}var lastOffset = determine offset off last message

while (true) {          
     read record       
      if record.offset == lastOffset then transition 'loading' into'ready'   
state 
}{quote}
Now the challenge is: if lastOffset is more advanced then the offset of the 
last record (at start up) then it will never transition into 'ready' unless new 
messages come in. But new message cannot come in unless the service is in the 
'ready' state. Catch 22.

Also, I have a solution based on back tracking and polling, which works, 
however it takes about 500 millis per offset to get the last offset this way. 
This delay is slightly problematic for me since there are thousands of such 
loading processes in a fairly short time frame.

If you tell me this is the only way to go, so be it.

But it does beg the question: why does seekToEnd make a distinction between 
read_committed and read_uncommitted? read_committed consumers will skip the 
control records anyway? Unless seekToEnd has more meaning, actually telling me 
where the last message is.

> KafkaConsumer.seekToEnd should return LSO 
> ------------------------------------------
>
>                 Key: KAFKA-17093
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17093
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.6.1
>         Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>            Reporter: Tom Kalmijn
>            Assignee: Andrew Schofield
>            Priority: Major
>         Attachments: Kafka17093-v2.java, Kafka17093-v3.java, Kafka17093.java
>
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to