[PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-17 Thread via GitHub


clolov opened a new pull request, #15213:
URL: https://github.com/apache/kafka/pull/15213

   ### Summary
   
   This is the first part of the implementation of 
[KIP-1005](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset)
   
   The purpose of this pull request is for the broker to start returning the 
correct offset when it receives a -5 as a timestamp in a ListOffsets API request


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-02 Thread via GitHub


brandboat opened a new pull request, #16783:
URL: https://github.com/apache/kafka/pull/16783

   This is a follow-up in 
[KAFKA-16154](https://issues.apache.org/jira/browse/KAFKA-16154)
   
   this pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell.
   the support version check are handled in 
https://github.com/apache/kafka/pull/16781
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-26 Thread via GitHub


showuon commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1965678417

   @clolov , do we have any update about the compilation error fix? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-28 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1968689211

   Apologies, I have had a lot of work recently, I will aim to provide an 
update by the end of the day today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-28 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1969562705

   Heya @showuon, I tried to reproduce the compilation failure of the unrelated 
test class locally, but have been unable to. I have rebased and kicked off a 
new build in case this was a temporary failure. If it occurs again I will 
continue looking into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-28 Thread via GitHub


showuon commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1970152996

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-28 Thread via GitHub


showuon merged PR #15213:
URL: https://github.com/apache/kafka/pull/15213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-04 Thread via GitHub


junrao commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1552178574


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Typically, if we add a new targetTimestamp value, we will need to bump up 
the version of the ListOffsetsRequest. See 
https://github.com/apache/kafka/pull/10760/files. Otherwise, a client could be 
setting LATEST_TIERED_TIMESTAMP and assuming that the server supports it, but 
the server actually does not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-05 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1553357069


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Yup, thanks a lot for bringing this up in the mailing list and here, I will 
open a pull request to amend this miss!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-04-05 Thread via GitHub


junrao commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1553959430


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
  */
 public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
 
+public static final long LATEST_TIERED_TIMESTAMP = -5L;

Review Comment:
   Thanks for following up, @clolov !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-17 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1455525955


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   I didn't really see a point in this check 
`earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset`. As 
far as I can tell the `cache.epochForOffset` already carries it out. Let me 
know in case I have misunderstood something.



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
   log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+val log = createLog(logDir, logConfig)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+val firstTimestamp = mockTime.milliseconds
+val leaderEpoch = 0
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+val secondTimestamp = firstTimestamp + 1
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = secondTimestamp),
+  leaderEpoch = leaderEpoch)
+
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+  }
+
+  @Test
+  def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {

Review Comment:
   This test could be combined with 
`testFetchOffsetByTimestampFromRemoteStorage` as the only difference it has are 
lines 2167, 2193, 2203 and 2204. Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-19 Thread via GitHub


kamalcph commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1458560112


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
latestEpochAsOptional(leaderEpochCache)))
+  } else if (targetTimestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+if (remoteLogEnabled()) {
+  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
+
+  val optEpoch: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
+val epoch = cache.epochForOffset(curHighestRemoteOffset)
+if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
+  })
+
+  Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
highestOffsetInRemoteStorage(), optEpoch))
+} else {
+  Option.empty

Review Comment:
   If we return empty, then exception 
[here](https://sourcegraph.com/github.com/apache/kafka@8950ed10a7a869488d7f73821f619608caa11819/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L1594)
 will be thrown back to the caller.  Do we want to throw an error back to the 
caller (or) empty response?
   
   ```
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))
   ```



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
   log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+val log = createLog(logDir, logConfig)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+val firstTimestamp = mockTime.milliseconds
+val leaderEpoch = 0
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+val secondTimestamp = firstTimestamp + 1
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = secondTimestamp),
+  leaderEpoch = leaderEpoch)
+
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),

Review Comment:
   Why are we using the `first` timestamp again?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   This is done for backward compatibility with older message format. You can 
go through the comments in the previous block:
   
   > The first cached epoch usually corresponds to the log start offset, but we 
have to verify this since it may not be true following a message format version 
bump as the epoch will not be 

Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-19 Thread via GitHub


kamalcph commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1458606590


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   subjective:
   
   Can we update the method similar to below for readability/clarity:
   
   ```java
   if (remoteLogEnabled()) {
 val curHighestRemoteOffset = highestOffsetInRemoteStorage()
 var result: Optional[Integer] = 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
 if (leaderEpochCache.isDefined) {
   val epochOpt = 
leaderEpochCache.get.epochForOffset(curHighestRemoteOffset)
   if (epochOpt.isPresent) {
 result = Optional.of(epochOpt.getAsInt) 
   }
 }
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
highestOffsetInRemoteStorage(), result))
   }
   ```
   
   Note when the `highestOffsetInRemoteStorage` is -1, then there won't be 
corresponding leaderEpoch, we have to return the `NO_PARTITION_LEADER_EPOCH` 
constant which is -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-23 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1463164583


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
latestEpochAsOptional(leaderEpochCache)))
+  } else if (targetTimestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+if (remoteLogEnabled()) {
+  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
+
+  val optEpoch: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
+val epoch = cache.epochForOffset(curHighestRemoteOffset)
+if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
+  })
+
+  Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
highestOffsetInRemoteStorage(), optEpoch))
+} else {
+  Option.empty

Review Comment:
   Good catch! In the KIP I have specified that Kafka should return no offset 
in such situations. I shall aim to add an integration test from the point of 
view of the client in an upcoming pull request



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1280,7 +1282,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
-targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
+targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP &&
+targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)

Review Comment:
   I have removed them both, but I don't think it would have caused problems 
either way.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   Hopefully I have achieved both in the subsequent commit (reverted one of the 
changes and made both easier to read). Let me know if this isn't the case!



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
   log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+val log = createLog(logDir, logConfig)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+val firstTimestamp = mockTime.milliseconds
+val leaderEpoch = 0
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+val secondTimestamp = firstTimestamp + 1
+log.appendAsLeader(TestUtils.singletonRecords(
+  

Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-23 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1905886714

   Thanks a lot for the review @kamalcph! I have hopefully addressed everything 
😊 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-01 Thread via GitHub


showuon commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1474112419


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1279,7 +1281,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
-targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&

Review Comment:
   Ah, this is a bug fix, right? 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)

Review Comment:
   Why do we change the return for "no leaderEpoch" case from empty to -1? I 
had a check, it seems won't change anything because the default value of 
leaderEpoch in `ListOffsetsPartitionResponse` is -1. Any thought on this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-01 Thread via GitHub


satishd commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1922694144

   I will take a look at this PR by Monday(5th).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1481322702


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1279,7 +1281,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
-targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&

Review Comment:
   Correct, Kamal called this out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1481540437


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)

Review Comment:
   You are correct, this is a miss on my side as part of making this piece of 
code more readable - fixing it in the subsequent commit.
   
   The logic should be
   * For EARLIEST_LOCAL_TIMESTAMP - return empty unless there is a leader epoch
   * For LATEST_TIERED_TIMESTAMP - if highest offset is -1 then return -1, if 
there is a highest offset then return empty unless there is a leader epoch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1932149971

   Heya @showuon @kamalcph @satishd, I hope I have addressed the latest 
comments!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-07 Thread via GitHub


satishd commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1481778242


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = Optional.empty()
+if (leaderEpochCache.isDefined) {
+  val epochOpt = 
leaderEpochCache.get.epochForOffset(curLocalLogStartOffset)
+  if (epochOpt.isPresent) epochResult = Optional.of(epochOpt.getAsInt)
+}
 
-Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
+Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochResult))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
latestEpochAsOptional(leaderEpochCache)))
+  } else if (targetTimestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+if (remoteLogEnabled()) {
+  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
+
+  var epochResult: Optional[Integer] = if (curHighestRemoteOffset == 
-1) Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH) else Optional.empty()

Review Comment:
   Can we use `val` instead of `var` here?
   
   ```
   val epochResult: Optional[Integer] =
   if (leaderEpochCache.isDefined) {
 val epochOpt = 
leaderEpochCache.get.epochForOffset(curHighestRemoteOffset)
 if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   ```



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = Optional.empty()

Review Comment:
   Can we use `val` instead of `var` here?
   
   ```
   val epochResult: Optional[Integer] =
 if (leaderEpochCache.isDefined) {
   val epochOpt = 
leaderEpochCache.get.epochForOffset(curLocalLogStartOffset)
   if (epochOpt.isPresent) Optional.of(epochOpt.getAsInt) else 
Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
 } else Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
   
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-08 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1482965204


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1301,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
-  val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
-})
-
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
+var epochResult: Optional[Integer] = Optional.empty()

Review Comment:
   I believe the condition for both this and the below need to be slightly 
altered to accommodate for 
https://github.com/apache/kafka/pull/15213#discussion_r1481540437. I have since 
made the change so let me know whether it is okay with you 😊 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-18 Thread via GitHub


showuon commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1951764274

   @clolov , there's a compilation error in jdk 8/scala 2.12. Could you help 
fix it? Maybe rebasing to the latest trunk could solve it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-02 Thread via GitHub


showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702427458


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -70,4 +72,21 @@ public static OffsetSpec maxTimestamp() {
 return new MaxTimestampSpec();
 }
 
+/**
+ * Used to retrieve the offset with the local log start offset,
+ * log start offset is the offset of a log above which reads are 
guaranteed to be served
+ * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
+ * as the earliest timestamp

Review Comment:
   nit: Used to retrieve ~~the offset with~~ the local log start offset[.] 
   [Local] log start offset is the offset of a log above which reads are 
guaranteed to be served from the disk of the leader broker.
   
   Note: When tiered storage is not enabled, it behaves the same as retrieving 
the earliest timestamp offset.



##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -70,4 +72,21 @@ public static OffsetSpec maxTimestamp() {
 return new MaxTimestampSpec();
 }
 
+/**
+ * Used to retrieve the offset with the local log start offset,
+ * log start offset is the offset of a log above which reads are 
guaranteed to be served
+ * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
+ * as the earliest timestamp
+ */
+public static OffsetSpec earliestLocalSpec() {
+return new EarliestLocalSpec();
+}
+
+/**
+ * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
+ * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)

Review Comment:
   nit: 
   
   Used to retrieve ~~the offset with~~ the highest offset of data stored in 
remote storage[.]
   
   Note: When tiered storage is not enabled, we ~~won't return any offset 
(i.e.~~  [will return] unknown offset.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-02 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702431351


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -70,4 +72,21 @@ public static OffsetSpec maxTimestamp() {
 return new MaxTimestampSpec();
 }
 
+/**
+ * Used to retrieve the offset with the local log start offset,
+ * log start offset is the offset of a log above which reads are 
guaranteed to be served
+ * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
+ * as the earliest timestamp
+ */
+public static OffsetSpec earliestLocalSpec() {
+return new EarliestLocalSpec();
+}
+
+/**
+ * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
+ * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)

Review Comment:
   Thanks for the comments, already fixed them in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-02 Thread via GitHub


showuon commented on PR #16783:
URL: https://github.com/apache/kafka/pull/16783#issuecomment-2266338009

   I mean, we can create a test called `listOffsetTest` under 
`org.apache.kafka.tiered.storage.integration` to test it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on PR #16783:
URL: https://github.com/apache/kafka/pull/16783#issuecomment-2266642180

   >Overall LGTM! We need integration tests for it. We should add some test 
like DeleteTopicTest using ExpectListOffsetsAction. I expected it should work 
like this (I didn't try):
   
   Thanks @showuon , I enabled remote storage in GetOffsetShellTest, do we 
still need the test you mentioned above ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702632108


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");

Review Comment:
   Could we set `remote.log.metadata.topic.num.partitions` to 1 (default is 50) 
to speed up the test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702631437


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -416,7 +514,7 @@ private List expectedOffsetsForTopic(int i) {
 
 private List executeAndParse(String... args) {
 String out = ToolsTestUtils.captureStandardOut(() -> 
GetOffsetShell.mainNoExit(addBootstrapServer(args)));
-
+System.out.println(out);

Review Comment:
   should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702631822


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -416,7 +514,7 @@ private List expectedOffsetsForTopic(int i) {
 
 private List executeAndParse(String... args) {
 String out = ToolsTestUtils.captureStandardOut(() -> 
GetOffsetShell.mainNoExit(addBootstrapServer(args)));
-
+System.out.println(out);

Review Comment:
   yep, just removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702701568


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {

Review Comment:
   I'll add some comment for this method. Ex:
   ```
   In this method, we'll create 4 topics and produce records to the log like 
this:
   topicRLS1 -> 1 segment
   topicRLS2 -> 2 segments (1 local log segment + 1 segment in the remote 
storage)
   topicRLS3 -> 3 segments (1 local log segment + 2 segments in the remote 
storage)
   topicRLS4 -> 4 segments (1 local log segment + 3 segments in the remote 
storage)
   ```



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+Map zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+Map raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   Could we add a comment here to explain why we need 2 different settings?



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -274,6 +333,45

Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702847984


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+Map zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+Map raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   Correct me if I'm wrong, after spending some time digging through the code I 
found out that ZK default listener name is `PLAINTEXT` and is set via
   
https://github.com/apache/kafka/blob/96989e4b6483e2e24883de20ff2cbcd2d3b39dbf/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java#L312-L316
   while in Raft, it is `EXTERNAL`, see  
https://github.com/apache/kafka/blob/1084d3b9c95aecccbe3c82e84ae4c8f406fc68e1/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L324-L332
   
   Another thing is that ClusterConfig#setListenerName only works under ZK 
mode, in RAFT it is useless. Perhaps we should make it work under Raft mode 
too. gentle ping @chia7712, any thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702848769


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {

Review Comment:
   Thank you for the detailed comment, this will certainly help others grasp 
the details of the test 😃



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702848944


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -274,6 +333,45 @@ public void testGetOffsetsByMaxTimestamp() {
 }
 }
 
+@ClusterTemplate("withRemoteStorage")
+public void testGetOffsetsByEarliestLocalSpec() throws 
InterruptedException {
+setUpRemoteLogTopics();
+
+for (String time : new String[] {"-4", "earliest-local"}) {
+TestUtils.waitForCondition(() ->
+Arrays.asList(
+new Row("topicRLS1", 0, 0L),
+new Row("topicRLS2", 0, 1L),
+new Row("topicRLS3", 0, 2L),
+new Row("topicRLS4", 0, 3L))
+.equals(executeAndParse("--topic-partitions", 
"topicRLS.*:0", "--time", time)),
+"testGetOffsetsByEarliestLocalSpec result not match");
+}
+}
+
+@ClusterTemplate("withRemoteStorage")
+public void testGetOffsetsByLatestTieredSpec() throws InterruptedException 
{
+setUp();
+setUpRemoteLogTopics();
+
+for (String time : new String[] {"-5", "latest-tiered"}) {
+// test topics disable remote log storage
+// as remote log not enabled, broker return unknown offset for 
each topic partition and these
+// unknown offsets are ignored by GetOffsetShell hence we have 
empty result here.
+assertEquals(Collections.emptyList(),
+executeAndParse("--topic-partitions", "topic\\d+:0", 
"--time", time));
+
+// test topics enable remote log storage
+TestUtils.waitForCondition(() ->
+Arrays.asList(
+new Row("topicRLS2", 0, 0L),
+new Row("topicRLS3", 0, 1L),
+new Row("topicRLS4", 0, 2L))

Review Comment:
   Everything has been addressed in the latest commit, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702847984


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+Map zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+Map raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   Correct me if I'm wrong, after spending some time digging through the code I 
found out that in Type.ZK the default listener name is `PLAINTEXT` and is set 
via
   
https://github.com/apache/kafka/blob/96989e4b6483e2e24883de20ff2cbcd2d3b39dbf/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java#L312-L316
   while in Type.KRAFT/Type.CO_KRAFT, it is `EXTERNAL`, see  
https://github.com/apache/kafka/blob/1084d3b9c95aecccbe3c82e84ae4c8f406fc68e1/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L324-L332
   
   Another thing is that ClusterConfig#setListenerName only works under ZK 
mode, in KRAFT it is useless. Perhaps we should make it work under Raft mode 
too. gentle ping @chia7712, any thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


chia7712 commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702865347


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+Map zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+Map raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   @brandboat you are right. Could you please open a jira for that? `KRAFT` 
should honor the `listener name` and `security protocol` from ClusterConfig.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


chia7712 commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702865629


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,80 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+// In this method, we'll create 4 topics and produce records to the 
log like this:
+// topicRLS1 -> 1 segment
+// topicRLS2 -> 2 segments (1 local log segment + 1 segment in the 
remote storage)
+// topicRLS3 -> 3 segments (1 local log segment + 2 segments in the 
remote storage)
+// topicRLS4 -> 4 segments (1 local log segment + 3 segments in the 
remote storage)
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+
serverProperties.put(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
 "1");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");
+
+return Arrays.asList(

Review Comment:
   We can set listener name for all types even though `setListenerName` is 
no-op for kraft/co-kraft mode. That can simplify the code and it is still valid.
   ```java
   return Collections.singletonList(
   ClusterConfig.defaultBuilder()
   .setTypes(new HashSet<>(Arrays.asList(ZK, KRAFT, 
CO_KRAFT)))
   .setServerProperties(serverProperties)
   .setListenerName("EXTERNAL")
   .build());
   ```



##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is 

Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


FrankYang0529 commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702947419


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are 
guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ * 
+ * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
  */
 public static OffsetSpec earliestLocalSpec() {
 return new EarliestLocalSpec();
 }
 
 /**
- * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
- * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+ * Used to retrieve the highest offset of data stored in remote storage.
+ * 
+ * Note: When tiered storage is not enabled, we will return unknown offset.
  */
 public static OffsetSpec latestTierSpec() {
 return new LatestTierSpec();

Review Comment:
   Thanks for finding this. Yes, we should follow the KIP. @brandboat, could 
you fix it in this PR? I'm afraid if I create another PR may block your 
progress.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1702999054


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are 
guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ * 
+ * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
  */
 public static OffsetSpec earliestLocalSpec() {

Review Comment:
   OK, I'll remove the postfix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703000636


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are 
guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ * 
+ * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
  */
 public static OffsetSpec earliestLocalSpec() {
 return new EarliestLocalSpec();
 }
 
 /**
- * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
- * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+ * Used to retrieve the highest offset of data stored in remote storage.
+ * 
+ * Note: When tiered storage is not enabled, we will return unknown offset.

Review Comment:
   this is included in 
https://github.com/apache/kafka/pull/16783/files/b6426e51d40e5b921f7d497bb0f1c3b2b88adf68#diff-8c7debdc8479f762a5a9585de62d3254c10768249bddd45391b49ba5683526e4R363-R367



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703000666


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are 
guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ * 
+ * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.

Review Comment:
   Sure, will do, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703013614


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,75 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+
+Map zkProperties = new HashMap<>(serverProperties);
+
zkProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "PLAINTEXT");
+
+Map raftProperties = new HashMap<>(serverProperties);
+
raftProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");

Review Comment:
   thanks for double checking, filed 
https://issues.apache.org/jira/browse/KAFKA-17256



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703026420


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -73,18 +73,20 @@ public static OffsetSpec maxTimestamp() {
 }
 
 /**
- * Used to retrieve the offset with the local log start offset,
- * log start offset is the offset of a log above which reads are 
guaranteed to be served
- * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
- * as the earliest timestamp
+ * Used to retrieve the local log start offset.
+ * Local log start offset is the offset of a log above which reads
+ * are guaranteed to be served from the disk of the leader broker.
+ * 
+ * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
  */
 public static OffsetSpec earliestLocalSpec() {
 return new EarliestLocalSpec();
 }
 
 /**
- * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
- * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+ * Used to retrieve the highest offset of data stored in remote storage.
+ * 
+ * Note: When tiered storage is not enabled, we will return unknown offset.
  */
 public static OffsetSpec latestTierSpec() {
 return new LatestTierSpec();

Review Comment:
   Everything has been addressed, thanks everyone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-03 Thread via GitHub


FrankYang0529 commented on PR #16783:
URL: https://github.com/apache/kafka/pull/16783#issuecomment-2267326163

   @brandboat, I think we also need to update L296-297 in GetOffsetShell
   
   ```
   throw new TerseException("Malformed time argument " + listOffsetsTimestamp + 
". " +
   "Please use -1 or latest / -2 or earliest / -3 
or max-timestamp, or a specified long format timestamp");
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-04 Thread via GitHub


showuon commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703090979


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,76 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+// In this method, we'll create 4 topics and produce records to the 
log like this:
+// topicRLS1 -> 1 segment
+// topicRLS2 -> 2 segments (1 local log segment + 1 segment in the 
remote storage)
+// topicRLS3 -> 3 segments (1 local log segment + 2 segments in the 
remote storage)
+// topicRLS4 -> 4 segments (1 local log segment + 3 segments in the 
remote storage)
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+
serverProperties.put(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
 "1");

Review Comment:
   I think we should add 
`RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX` to 
set `REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP`, but I might be wrong. Could 
you confirm the current change takes effect in the remote log topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-04 Thread via GitHub


brandboat commented on code in PR #16783:
URL: https://github.com/apache/kafka/pull/16783#discussion_r1703116497


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -70,31 +85,76 @@ private String getTopicName(int i) {
 return "topic" + i;
 }
 
+private String getRemoteLogStorageEnabledTopicName(int i) {
+return "topicRLS" + i;
+}
+
 private void setUp() {
+setupTopics(this::getTopicName, Collections.emptyMap());
+sendProducerRecords(this::getTopicName);
+}
+
+private void setUpRemoteLogTopics() {
+// In this method, we'll create 4 topics and produce records to the 
log like this:
+// topicRLS1 -> 1 segment
+// topicRLS2 -> 2 segments (1 local log segment + 1 segment in the 
remote storage)
+// topicRLS3 -> 3 segments (1 local log segment + 2 segments in the 
remote storage)
+// topicRLS4 -> 4 segments (1 local log segment + 3 segments in the 
remote storage)
+Map rlsConfigs = new HashMap<>();
+rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+}
+
+private void setupTopics(Function topicName, Map configs) {
 try (Admin admin = cluster.createAdminClient()) {
 List topics = new ArrayList<>();
 
-IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+IntStream.range(0, topicCount + 1).forEach(i ->
+topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
 admin.createTopics(topics);
 }
+}
 
+private void sendProducerRecords(Function topicName) {
 Properties props = new Properties();
 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
-.forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> {
-assertDoesNotThrow(() -> producer.send(
-new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-})
-);
+.forEach(i -> IntStream.range(0, i * i)
+.forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get(;
 }
 }
 
+private static List withRemoteStorage() {
+Map serverProperties = new HashMap<>();
+
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+
serverProperties.put(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
 "1");

Review Comment:
   yeah... I missed that. Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-04 Thread via GitHub


brandboat commented on PR #16783:
URL: https://github.com/apache/kafka/pull/16783#issuecomment-2267568015

   failed test passed in my local env:
   ```sh
   ./gradlew cleanTest core:test --tests 
kafka.zk.ZkMigrationFailoverTest.testDriverSkipsEventsFromOlderEpoch tool:test 
--tests 
org.apache.kafka.tools.LeaderElectionCommandTest.testPreferredReplicaElection 
clients:test --tests 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.testBuffersDereferencedOnClose
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-08-04 Thread via GitHub


showuon merged PR #16783:
URL: https://github.com/apache/kafka/pull/16783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org