chia7712 commented on code in PR #16876:
URL: https://github.com/apache/kafka/pull/16876#discussion_r1721174887
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -336,9 +336,8 @@ private CompletableFuture<ListOffsetResult>
buildListOffsetRequestToNode(
Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>
targetTimes,
boolean requireTimestamps,
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
- ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
- .forConsumer(requireTimestamps, isolationLevel, false, false)
-
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
+ List<ListOffsetsRequestData.ListOffsetsTopic> topics =
ListOffsetsRequest.toListOffsetsTopics(targetTimes);
Review Comment:
If there is no strong reason, could we keep fluent pattern?
```java
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamps, isolationLevel)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
```
##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -288,8 +288,8 @@ class RequestQuotaTest extends BaseRequestTest {
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
- ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false)
- .setTargetTimes(List(topic).asJava)
+ val targetTimes = List(topic).asJava
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2365,9 +2366,10 @@ private ListOffsetsRequest createListOffsetRequest(short
version) {
.setPartitionIndex(0)
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
+ List<ListOffsetsTopic> topics = singletonList(topic);
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false,
false)
- .setTargetTimes(singletonList(topic))
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(topics)
Review Comment:
ditto
##########
core/src/test/scala/unit/kafka/server/LogOffsetTest.scala:
##########
@@ -60,8 +60,9 @@ class LogOffsetTest extends BaseRequestTest {
@ValueSource(strings = Array("zk", "kraft"))
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
- val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
+ val targetTimes = buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2378,9 +2380,10 @@ private ListOffsetsRequest createListOffsetRequest(short
version) {
ListOffsetsTopic topic = new ListOffsetsTopic()
.setName("test")
.setPartitions(singletonList(partition));
+ List<ListOffsetsTopic> topics = singletonList(topic);
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED, false,
false)
- .setTargetTimes(singletonList(topic))
+ .forConsumer(true, IsolationLevel.READ_COMMITTED)
+ .setTargetTimes(topics)
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2354,9 +2354,10 @@ private ListOffsetsRequest createListOffsetRequest(short
version) {
.setTimestamp(1000000L)
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
+ List<ListOffsetsTopic> topics = singletonList(topic);
return ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED,
false, false)
- .setTargetTimes(singletonList(topic))
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(topics)
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java:
##########
@@ -68,11 +76,11 @@ public void testGetErrorResponse() {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED, false,
false)
+ .forConsumer(true, IsolationLevel.READ_COMMITTED)
.setTargetTimes(topics)
.build(version);
ListOffsetsResponse response = (ListOffsetsResponse)
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
-
Review Comment:
please revert this unrelated change
##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -102,6 +110,7 @@ public ListOffsetsRequest build(short version) {
public String toString() {
return data.toString();
}
+
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -287,15 +287,15 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
private def createListOffsetsRequest = {
- requests.ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
- List(new ListOffsetsTopic()
- .setName(tp.topic)
- .setPartitions(List(new ListOffsetsPartition()
- .setPartitionIndex(tp.partition)
- .setTimestamp(0L)
- .setCurrentLeaderEpoch(27)).asJava)).asJava
- ).
- build()
+ val topics = List(new ListOffsetsTopic()
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]