Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru merged PR #15525: URL: https://github.com/apache/kafka/pull/15525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1550191029 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: All good then, just wanting to make sure we're not loosing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1550094782 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: This appears in the PR because i didn't rebase correctly. You've actually moved the test to here: https://github.com/apache/kafka/blob/21479a31bdff0e15cfe7ee0a4e509232ed064b41/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala#L261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2034772501 Hey @philipnee, thanks for the updates, just one minor comment left above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1549857291 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: Is this one being removed intentionally? the suggestion was only to move it to the `PlainTextConsumerCommit` file, where all tests related to committing offsets are now. Ok for me if you think it's not worth keeping, but just to make sure it's intentional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2033505687 hi @lianetm - Much appreciate for the reviews. I think I've addressed your comments. LMK if there's anything more. cc @lucasbru -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548891748 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: I think this is an error from rebase. so this should be removed from the PR. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548887021 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { Review Comment: Sorry - wasn't looking carefully at it. Putting things back to the original place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548614429 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { Review Comment: oh this one (and the one below) are related to partition's offsets, not committed offsets, so I would say they need to stay in the PlaintextConsumer, where you had them (I was only suggesting to move the `testSubscribeAndCommitSync` here, because it relates to committed offsets) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548603219 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: yes, agree that's the failure we noticed on the sys tests, but conceptually we're creating a new `OffsetAndTimestampInternal` class that is the same as the existing `OffsetAndTimestamp`, with the only difference that the former does not throw on negative offsets or negative timestamps, right? so for the class doc makes sense to mention it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548599057 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: I think the problem is negative timestamp in the response causing `org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Invalid negative timestamp`. More specifically is this part that was complaining: ``` if (timestamp < 0) throw new IllegalArgumentException("Invalid negative timestamp"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548596033 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: it's actually both! he he, so let's maybe add _negative offsets and timestamps_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { +// This test ensure that the member ID is propagated from the group coordinator when the +// assignment is received into a subsequent offset commit +val consumer = createConsumer() +assertEquals(0, consumer.assignment.size) +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +consumer.seek(tp, 0) + +consumer.commitSync() + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +val numRecords = 1 + (0 until numRecords).map { i => + val timestamp = startingTimestamp + i.toLong + val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) + producer.send(record) + record +} +producer.flush() + +val consumer = createConsumer() +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +val endOffsets = consumer.endOffsets(Set(tp).asJava) +assertEquals(numRecords, endOffsets.get(tp)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = { Review Comment: maybe `testFetchOffsetsForTime`, which already implies searching at a given timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { +// This test ensure that the member ID is propagated from the group coordinator when the +// assignment is received into a subsequent offset commit +val consumer = createConsumer() +assertEquals(0, consumer.assignment.size) +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +consumer.seek(tp, 0) + +consumer.commitSync() + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +val numRecords = 1 + (0 until numRecords).map { i => + val timestamp = startingTimestamp + i.toLong + val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) + producer.send(record) + record +} +producer.flush() + +val consumer = createConsumer() +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +val endOffsets = consumer.endOffsets(Set(tp).asJava) +assertEquals(numRecords, endOffsets.get(tp)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = { Review Comment: maybe `testFindOffsetsForTime`, which already implies searching at a given timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593248 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1146,29 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions -.stream() -.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); +.stream() +.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( -timestampToSearch, -false, -timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); -return offsetAndTimestampMap -.entrySet() -.stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); +timestampToSearch, +timer, +false); + +Map offsetAndTimestampMap; +if (timeout.isZero()) { +applicationEventHandler.add(listOffsetsEvent); Review Comment: Thanks for the explanation! Totally ok to tackle it with that separate Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548591307 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: uhm...what `OffsetsAndTimestamp` does not allow is negative offsets [here](https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java#L35), and that's the requirement this new one is removing. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548585054 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() { return offsetResetTimestamps; } -static Map buildOffsetsForTimesResult(final Map timestampsToSearch, - final Map fetchedOffsets) { -HashMap offsetsByTimes = new HashMap<>(timestampsToSearch.size()); +static Map buildListOffsetsResult( Review Comment: good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548581658 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: Timestamps I assume. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548580872 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { +// This test ensure that the member ID is propagated from the group coordinator when the +// assignment is received into a subsequent offset commit +val consumer = createConsumer() +assertEquals(0, consumer.assignment.size) +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +consumer.seek(tp, 0) + +consumer.commitSync() + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +val numRecords = 1 + (0 until numRecords).map { i => + val timestamp = startingTimestamp + i.toLong + val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) + producer.send(record) + record +} +producer.flush() + +val consumer = createConsumer() +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +val endOffsets = consumer.endOffsets(Set(tp).asJava) +assertEquals(numRecords, endOffsets.get(tp)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = { Review Comment: Including the func name we're testing (`offsetsAndTimestamps`) would probably make the test name clearer... maybe something around `testOffsetsAndTimestampsTargetTimestamps`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548576809 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: We have split the consumer tests into separate files grouped by feature, and there is now one `PlaintextConsumerCommitTest`, I would expect this test should go there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548574194 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1146,29 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions -.stream() -.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); +.stream() +.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( -timestampToSearch, -false, -timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); -return offsetAndTimestampMap -.entrySet() -.stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); +timestampToSearch, +timer, +false); + +Map offsetAndTimestampMap; +if (timeout.isZero()) { +applicationEventHandler.add(listOffsetsEvent); Review Comment: hi @lianetm thanks for the comment. There's a ticket to align the behavior of the two apis per your suggestions there. The plan is to do that in a separated pr. https://issues.apache.org/jira/browse/KAFKA-16433 Back to your first comment, it is not immediately obvious to see why people use these two apis with zero timeout. The only thing sensible thing it does to updating the local highwatermark as you mentioned. I think it is worth addressing this ambiguity after 4.0 release. So I'll leave a comment per your request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548571273 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() { return offsetResetTimestamps; } -static Map buildOffsetsForTimesResult(final Map timestampsToSearch, - final Map fetchedOffsets) { -HashMap offsetsByTimes = new HashMap<>(timestampsToSearch.size()); +static Map buildListOffsetsResult( Review Comment: This generic `buildListOffsetsResult` is currently only being used from `buildOffsetsForTimesResult`, was the intention to used it also from `buildOffsetsForTimeInternalResult`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548554563 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp}. Review Comment: I would add : Internal representation of {@link OffsetAndTimestamp} **that allows negative offsets** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1146,29 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions -.stream() -.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); +.stream() +.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( -timestampToSearch, -false, -timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); -return offsetAndTimestampMap -.entrySet() -.stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); +timestampToSearch, +timer, +false); + +Map offsetAndTimestampMap; +if (timeout.isZero()) { +applicationEventHandler.add(listOffsetsEvent); Review Comment: so if I get it right we are intentionally leaving this? generating an event to get offsets, when in the end we return right away without waiting for a response? I do get that the old consumer does it, and I could be missing the purpose of it, but seems to me an unneeded request, even considering the side effect of the onSuccess handler. The handler just updates the positions to reuse the offsets it just retrieved, and it does make sense to reuse the result when we do need to make a request, but I wouldn't say we need to generate an unneeded event/request just for that when the user requested offsets with max-time-to-wait=0. In any case, if we prefer to keep this, I would suggest 2 things: 1. to add a comment explaining why (handler), because it looks like a weird overhead to add the event and return, 2. to be consistent and generate the event also in the case of the `offsetsForTimes` before the early return (ln 1104). In the case of the old consumer, it's a common logic so both path, `offsetsForTimes` and `beginning/endOffsets` do the same request+return -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1146,29 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions -.stream() -.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); +.stream() +.collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( -timestampToSearch, -false, -timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); -return offsetAndTimestampMap -.entrySet() -.stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); +timestampToSearch, +timer, +false); + +Map offsetAndTimestampMap; +if (timeout.isZero()) { +applicationEventHandler.add(listOffsetsEvent); Review Comment: so if I get it right we are intentionally leaving this? generating an event to get offsets, when in the end we return right away without waiting for a response? I do get that the old consumer does it, and I could be missing the purpose of it, but seems to me an unneeded request, even considering the side effect of the onSuccess handler. The handler just updates the positions to reuse the offsets it just retrieved, and it does make sense to reuse the result when we do need to make a request, but I wouldn't say we need to generate an unneeded event/request just for that when the user requested offsets with max-time-to-wait=0. In any case, if we prefer to keep this, I would suggest 2 things: 1. to add a comment explaining why (handler), because it looks like a weird overhead to add the event and return, 2. to be consistent and generate the event also in the case of the `offsetsForTimes` before the early return (ln 1104). In the case of the old consumer, it's a common logic so both path, `offsetsForTimes` and `beginning/endOffsets` do the same request+return -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2026690869 @lucasbru - Thanks for taking time reviewing this PR. This PR is ready for another pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1544067091 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -946,7 +956,7 @@ public void testOffsetsForTimesWithZeroTimeout() { @Test public void testWakeupCommitted() { consumer = newConsumer(); -final HashMap offsets = mockTopicPartitionOffset(); Review Comment: just cleaning up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1544046037 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1141,27 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, -false, timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( + +// shortcut the request if the timeout is zero. +if (timeout.isZero()) { Review Comment: scratch off the previous comment - addAndGet actually doesn't. We will need to explicitly return an empty result. See the code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2025522451 hi @lucasbru - Let me address Lianets comment in this PR and have a separated PR for the behavior inconsistency as it does require some changes to the unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2024835245 @philipnee Okay, thanks for creating the ticket. Not sure if it's blocker priority though. If it's a quick thing, you could address it in this PR. Are you going to implement Lianets suggestion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2023293507 @lucasbru - If I'm not mistaken, the current implementation for both beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon getting ZERO duration. Seems like both code paths are invoking this logic ``` // if timeout is set to zero, do not try to poll the network client at all // and return empty immediately; otherwise try to get the results synchronously // and throw timeout exception if it cannot complete in time if (timer.timeoutMs() == 0L) return result; ``` But the offsets for time seems to shortcircuit it here: ``` // If timeout is set to zero return empty immediately; otherwise try to get the results // and throw timeout exception if it cannot complete in time. if (timeout.toMillis() == 0L) return listOffsetsEvent.emptyResult(); return applicationEventHandler.addAndGet(listOffsetsEvent, timer); ``` I'll create a ticket to align the behavior of these two APIs in the new consumers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2022475971 > @lucasbru - Thanks again for reviewing the PR. Sorry about the misinterpretation on short circuting logic so here I updated the beginningOrEndOffsets API. It seems like the right thing to do here is to still send out the request but return it immediately for zero timeout (a bit strange because it does throw timeout when time runs out which seems inconsistent). Yes, the behavior of the existing consumer is a bit curious, but it's not the only place where a zero duration is treated different from 0.01s. Either way, we probably have to do it this way for compatibility. This part looks good to me now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1540879732 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java: ## @@ -25,22 +25,15 @@ import java.util.Map; /** - * Event for retrieving partition offsets by performing a + * Application Event for retrieving partition offsets by performing a * {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}. - * This event is created with a map of {@link TopicPartition} and target timestamps to search - * offsets for. It is completed with the map of {@link TopicPartition} and - * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than - * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { - +public class ListOffsetsEvent extends CompletableApplicationEvent> { Review Comment: I'm personally not concerned about having two events, because they are very simple. The alternative is to have a common code-path that carries a `requiresTimestamp` boolean to differentiate behavior again, which isn't really any simpler. But I agree there is a certain amount of code duplication here that we could eliminate using your approach @lianetm , so I'm not against it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1540114887 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java: ## @@ -25,22 +25,15 @@ import java.util.Map; /** - * Event for retrieving partition offsets by performing a + * Application Event for retrieving partition offsets by performing a * {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}. - * This event is created with a map of {@link TopicPartition} and target timestamps to search - * offsets for. It is completed with the map of {@link TopicPartition} and - * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than - * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { - +public class ListOffsetsEvent extends CompletableApplicationEvent> { Review Comment: thanks, sounds like a good idea to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1539933946 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java: ## @@ -25,22 +25,15 @@ import java.util.Map; /** - * Event for retrieving partition offsets by performing a + * Application Event for retrieving partition offsets by performing a * {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}. - * This event is created with a map of {@link TopicPartition} and target timestamps to search - * offsets for. It is completed with the map of {@link TopicPartition} and - * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than - * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { - +public class ListOffsetsEvent extends CompletableApplicationEvent> { Review Comment: KInd of a general comment looking for simplification: couldn't we just have an internal new class `OffsetAndTimestampInternal` (better named), that allows negatives and knows how to build an `OffsetAndTimestamp`? Seems to solve the problem we have, without having to split the `ListOffsets` into 2 events, with separate paths for beginning/endOffsets and offsetsForTimes, where in reality they have everything in common, except for the object we use to encapsulate the result (same result). These new splitted path leak down to the OffsetsManager event, when in reality, at the request/response level the manager is responsible for, everything is the same for both paths. With this approach the change would only be at the API level, on the consumer, where the result of the event would build the map with Longs for the beginning/end, or the map with OffsetAndTimestamp for the offsetsForTimes (data is the same, we just need to change how we return it). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2020963772 @lucasbru - Thanks again for reviewing the PR. Sorry about the misinterpretation on short circuting logic so here I updated the beginningOrEndOffsets API. It seems like the right thing to do here is to still send out the request but return it immediately for zero timeout (a bit strange because it does throw timeout when time runs out which seems inconsistent). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1539524047 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1141,27 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, -false, timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( + +// shortcut the request if the timeout is zero. +if (timeout.isZero()) { Review Comment: Hey @lucasbru - It seems like one of the handlers would also update the subscription state upon completion. See the snippet below: ``` public void onSuccess(ListOffsetResult value) { synchronized (future) { result.fetchedOffsets.putAll(value.fetchedOffsets); remainingToSearch.keySet().retainAll(value.partitionsToRetry); offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel); } } ``` I think addAndGet seems to be sufficient to handle such logic so I'll revert this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1538901757 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1141,27 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, -false, timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( + +// shortcut the request if the timeout is zero. +if (timeout.isZero()) { Review Comment: I've tried to dig into this a bit. So short-cutting is definitely the right thing to do, since otherwise we'll run into a time-out exception. The old consumer is a bit weird in that it fires the list offset request, but never returns a result. But I also couldn't find a cache that that is influenced by the list offsets request, so what's the point of sending the request? Replicating the old consumer behavior would mean creating the event for the background thread, but not waiting for the result. We can consider changing the behavior here, but let's make sure we do it consciously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1538863396 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1141,27 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, -false, timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( + +// shortcut the request if the timeout is zero. +if (timeout.isZero()) { Review Comment: Sorry, only noticed this now, but the original consumer seems to send a list offset request even if the timeout is 0, and you are specifically introducing code to avoid that. Isn't that going against what Kirk is trying to achieve? cc @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2018886221 Hey @lucasbru - Thanks for taking the time to review this PR. Let me know if there's anything to add to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1538195311 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -423,11 +432,11 @@ private CompletableFuture sendListOffsetsRequestsAndResetPositions( }); }); -if (unsentRequests.size() > 0) { +if (unsentRequests.isEmpty()) { Review Comment: @lucasbru - Switched the order as !__.isEmpty is rather difficult to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1537804695 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -411,8 +411,8 @@ public int memberEpoch() { public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) { if (response.errorCode() != Errors.NONE.code()) { String errorMessage = String.format( -"Unexpected error in Heartbeat response. Expected no error, but received: %s", Review Comment: good call. I think it was editor's auto correction. Reverting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1537350070 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java: ## @@ -32,8 +32,7 @@ * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { - +public class ListOffsetsEvent extends CompletableApplicationEvent> { Review Comment: How about using two separate events `ListOffsets` and `ListOffsetsWithTimestamps`? We could save the "requireTimestamps" boolean and the use of generics here, which I think would simplify the code. That would also get rid of some `unchecked` warnings, that you are currently suppressing, I'd expect. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -411,8 +411,8 @@ public int memberEpoch() { public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) { if (response.errorCode() != Errors.NONE.code()) { String errorMessage = String.format( -"Unexpected error in Heartbeat response. Expected no error, but received: %s", Review Comment: nit: White space change in a file you aren't otherwise changing, I'd avoid it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2017309149 Hey @lucasbru - Would it be possible to ask you to review this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1535039670 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ## @@ -480,11 +475,16 @@ public void testRequestFails_AuthenticationException() { // Response received with auth error NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); ClientResponse clientResponse = -buildClientResponseWithAuthenticationException(unsentRequest); Review Comment: the function was only used once... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1535038917 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ## @@ -131,13 +131,11 @@ public void testListOffsetsRequest_Success() throws ExecutionException, Interrup ListOffsetsRequest.EARLIEST_TIMESTAMP); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); -CompletableFuture> result = requestManager.fetchOffsets( -timestampsToSearch, -false); +CompletableFuture> result = requestManager.beginningOrEndOffset(timestampsToSearch); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); -Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); +Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, 5L); Review Comment: Note: A lot of mock results are converted to Long because most tests have the `requireTimestamps` field marked false, which implies they are only invoking beginningOrEndOffsets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1535038917 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ## @@ -131,13 +131,11 @@ public void testListOffsetsRequest_Success() throws ExecutionException, Interrup ListOffsetsRequest.EARLIEST_TIMESTAMP); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); -CompletableFuture> result = requestManager.fetchOffsets( -timestampsToSearch, -false); +CompletableFuture> result = requestManager.beginningOrEndOffset(timestampsToSearch); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); -Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); +Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, 5L); Review Comment: Note: A lot of mock results are converted to Long because most tests have the `requireTimestamps` field marked false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org