Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2033505687

   hi @lianetm - Much appreciate for the reviews.  I think I've addressed your 
comments. LMK if there's anything more. cc @lucasbru 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548891748


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   I think this is an error from rebase. so this should be removed from the PR. 
 Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548887021


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   Sorry - wasn't looking carefully at it.  Putting things back to the original 
place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16430) The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end.

2024-04-02 Thread Gao Fei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833389#comment-17833389
 ] 

Gao Fei commented on KAFKA-16430:
-

[~chia7712] what you mean?  Is the newer kafka script referring to the use of 
the new version of the kafka-consumer-group.sh client script? But now there is 
a problem with the kafka broker server side.

> The group-metadata-manager thread is always in a loading state and occupies 
> one CPU, unable to end.
> ---
>
> Key: KAFKA-16430
> URL: https://issues.apache.org/jira/browse/KAFKA-16430
> Project: Kafka
>  Issue Type: Bug
>  Components: group-coordinator
>Affects Versions: 2.4.0
>Reporter: Gao Fei
>Priority: Blocker
>
> I deployed three broker instances and suddenly found that the client was 
> unable to consume data from certain topic partitions. I first tried to log in 
> to the broker corresponding to the group and used the following command to 
> view the consumer group:
> {code:java}
> ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe 
> --group mygroup{code}
> and found the following error:
> {code:java}
> Error: Executing consumer group command failed due to 
> org.apache.kafka.common.errors.CoodinatorLoadInProgressException: The 
> coodinator is loading and hence can't process requests.{code}
> I then discovered that the broker may be stuck in a loop, which is constantly 
> in a loading state. At the same time, I found through the top command that 
> the "group-metadata-manager-0" thread was constantly consuming 100% of the 
> CPU resources. This loop could not be broken, resulting in the inability to 
> consume topic partition data on that node. At this point, I suspected that 
> the issue may be related to the __consumer_offsets partition data file loaded 
> by this thread.
> Finally, after restarting the broker instance, everything was back to normal. 
> It's very strange that if there was an issue with the __consumer_offsets 
> partition data file, the broker should have failed to start. Why was it able 
> to automatically recover after a restart? And why did this continuous loop 
> loading of the __consumer_offsets partition data occur?
> We encountered this issue in our production environment using Kafka versions 
> 2.2.1 and 2.4.0, and I believe it may also affect other versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16412) Uncreated topics is considered as created topics

2024-04-02 Thread gendong1 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833387#comment-17833387
 ] 

gendong1 commented on KAFKA-16412:
--

sorry for later reply. I will try to trigger this problem in 3.6+.

> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: gendong1
>Priority: Major
> Attachments: AnotherClientOutput.txt, kafkaServer1.out, 
> kafkaServer2.out, kafkaServer3.out
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15801: improve Kafka broker/NetworkClient logging for connectiv… [kafka]

2024-04-02 Thread via GitHub


github-actions[bot] commented on PR #14799:
URL: https://github.com/apache/kafka/pull/14799#issuecomment-2033467022

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16070: move setReadOnly to Headers [kafka]

2024-04-02 Thread via GitHub


github-actions[bot] commented on PR #15097:
URL: https://github.com/apache/kafka/pull/15097#issuecomment-2033466946

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]

2024-04-02 Thread via GitHub


Joker-5 commented on code in PR #15642:
URL: https://github.com/apache/kafka/pull/15642#discussion_r1547690392


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java:
##
@@ -69,6 +69,9 @@ public static void setup() {
 Map workerProps = new HashMap<>();
 workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
 
+// Work around a circular-dependency in TestPlugins.
+TestPlugins.pluginPath();

Review Comment:
   @vamossagar12 If we don't add this, methods which use `TestPlugins`  will 
have erroneous behavior. In particular:
   ```
   [2024-04-02 19:16:25,977] ERROR Could not set up plugin test jars 
(org.apache.kafka.connect.runtime.isolation.TestPlugins:258)
   java.lang.NullPointerException
at 
org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.values(TestPlugins.java:69)
at 
org.apache.kafka.connect.runtime.isolation.TestPlugins.(TestPlugins.java:251)
at 
org.apache.kafka.connect.runtime.isolation.TestPlugins$TestPlugin.(TestPlugins.java:128)
at 
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest.testConnectorHasConverterWithNoSuitableConstructor(ConnectorValidationIntegrationTest.java:337)
   ...
   ```
   I found the reason behind it:
   
   >TL;DR
   The whole error occurred because the circle: inner class(some field depend 
on outer class in the \ method) -> outer class(some line in static code 
block depend on inner class in the \ method) -> inner class
   
   1. There's a `private final Predicate removeRuntimeClasses` field in 
the inner class 
`org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`.
   2. Some enums such as `BAD_PACKAGING_MISSING_SUPERCLASS` in (1)'s field will 
depend on `private static final Predicate REMOVE_CLASS_FILTER = s -> 
s.contains("NonExistentInterface")` in the outer 
class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`).
   3. When test mothod such as 
`testConnectorHasConverterWithNoSuitableConstructor` is running, JVM will call 
\ method to initialize the inner 
class(`org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin`).
   4. But some enum use the field(`removeRuntimeClasses`) in the outer 
class(`private static final Predicate REMOVE_CLASS_FILTER = s -> 
s.contains("NonExistentInterface")`), so at that time JVM has to call \ 
method to initialize the outer 
class(`org.apache.kafka.connect.runtime.isolation.TestPlugins`).
   5. In the outer class, there's a static code block which used the inner 
class. But the inner class have not been initialized, the circular-dependency 
just happened, which result in that error log.
   ```java
   static {
   Throwable err = null;
   Map pluginJars = new HashMap<>();
   try {
   for (TestPlugin testPlugin : TestPlugin.values()) { // see this 
line
   if (pluginJars.containsKey(testPlugin.resourceDir())) {
   log.debug("Skipping recompilation of " + 
testPlugin.resourceDir());
   }
   pluginJars.put(testPlugin.resourceDir(), 
createPluginJar(testPlugin.resourceDir(), testPlugin.removeRuntimeClasses()));
   }
   } catch (Throwable e) {
   log.error("Could not set up plugin test jars", e);
   err = e;
   }
   PLUGIN_JARS = Collections.unmodifiableMap(pluginJars);
   INITIALIZATION_EXCEPTION = err;
   }
   ```
   
   So that's why I added the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16466) QuorumController is swallowing some exception messages

2024-04-02 Thread Chia Chuan Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia Chuan Yu reassigned KAFKA-16466:
-

Assignee: Chia Chuan Yu

> QuorumController is swallowing some exception messages
> --
>
> Key: KAFKA-16466
> URL: https://issues.apache.org/jira/browse/KAFKA-16466
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.7.0
>Reporter: David Arthur
>Assignee: Chia Chuan Yu
>Priority: Major
>  Labels: good-first-issue
> Fix For: 3.8.0, 3.7.1
>
>
> In some cases in QuorumController, we throw exceptions from the control 
> manager methods. Unless these are explicitly caught and handled, they will 
> eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit 
> the generic error handler.
> In the generic error handler of QuorumController, we examine the exception to 
> determine if it is a fault or not. In the case where it is not a fault, we 
> log the error like:
> {code:java}
>  log.info("{}: {}", name, failureMessage);
> {code}
> which results in messages like
> {code:java}
> [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: 
> event failed with UnsupportedVersionException in 167 microseconds. 
> (org.apache.kafka.controller.QuorumController:544)
> {code}
> In this case, the exception actually has more details in its own message
> {code:java}
> Unable to register because the broker does not support version 8 of 
> metadata.version. It wants a version between 20 and 20, inclusive.
> {code}
> We should include the exception's message in the log output for non-fault 
> errors as it includes very useful debugging info.
> This was found while writing an integration test for KRaft migration where 
> the brokers and controllers have a mismatched MetadataVersion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16467) Add README to docs folder

2024-04-02 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-16467:
--
Description: 
We don't have a guide in project root folder or docs folder to show how to run 
local website. It's good to provide a way to run document with kafka-site 
repository.

 

Option 1: Add links to wiki page 
[https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
 and 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 

Option 2: Show how to run the document within container. For example: moving 
`site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.

  was:We don't have a guide in project root folder or docs folder to show how 
to run local website. It's good to provide a way to run document with 
kafka-site repository.


> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16467) Add README to docs folder

2024-04-02 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-16467:
--
Issue Type: Improvement  (was: Test)

> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16467) Add README to docs folder

2024-04-02 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-16467:
-

 Summary: Add README to docs folder
 Key: KAFKA-16467
 URL: https://issues.apache.org/jira/browse/KAFKA-16467
 Project: Kafka
  Issue Type: Test
Reporter: PoAn Yang
Assignee: PoAn Yang


We don't have a guide in project root folder or docs folder to show how to run 
local website. It's good to provide a way to run document with kafka-site 
repository.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-02 Thread via GitHub


showuon commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2033422766

   There are quite many failed tests because of this change. Could you take a 
look?
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15557/4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15615) Improve handling of fetching during metadata updates

2024-04-02 Thread appchemist (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833359#comment-17833359
 ] 

appchemist commented on KAFKA-15615:


HI, [~kirktrue] 

I want take this issue.

 

Do you have any potential solutions in mind?

I sent my approach as a PR.

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.8.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable

2024-04-02 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833354#comment-17833354
 ] 

Chia-Ping Tsai commented on KAFKA-13907:


[~soarez]  I have observed this issue today, and I agree to the discussion 
(https://github.com/apache/kafka/pull/12174#discussion_r875182105)  that adding 
a timeout to `shutdown` for testing is a solution. Do you want to file PR again?

> Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
> --
>
> Key: KAFKA-13907
> URL: https://issues.apache.org/jira/browse/KAFKA-13907
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Assignee: Igor Soarez
>Priority: Major
>  Labels: newbie
>
> ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang 
> up waiting for controlled shutdown, there may be some bug related to it.
> since this bug can be reproduced locally, it won't be hard to investigated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-04-02 Thread via GitHub


showuon commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-2033354648

   
   
   ```

/home/jenkins/workspace/Kafka_kafka-pr_PR-15522/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java:354:
 error: method onAssignment in class AssignmentsManager cannot be applied to 
given types;
   [2024-04-02T21:59:04.001Z] manager.onAssignment(new 
TopicIdPartition(TOPIC_1, 0), dirs[i % 3], onComplete);
   [2024-04-02T21:59:04.001Z]^
   [2024-04-02T21:59:04.001Z]   required: TopicIdPartition,Uuid,String,Runnable
   [2024-04-02T21:59:04.001Z]   found: TopicIdPartition,Uuid,Runnable
   [2024-04-02T21:59:04.001Z]   reason: actual and formal argument lists differ 
in length
   ```
   
   Another compilation error: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/3/pipeline
   
   Please help fix it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548765182


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   sure. 
https://github.com/apache/kafka/pull/15621/commits/8a7ed30692bd070fb4160a6cbc76a868484529c3
 return none and add related test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-04-02 Thread Suprem Vanam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833347#comment-17833347
 ] 

Suprem Vanam commented on KAFKA-16262:
--

Hi [~mjsax], can I work on this ticket? This is my first ticket, could you 
please let me know how to get started?

 

I appreciate your help.

> Add IQv2 to Kafka Streams documentation
> ---
>
> Key: KAFKA-16262
> URL: https://issues.apache.org/jira/browse/KAFKA-16262
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, newbie
>
> The new IQv2 API was added many release ago. While it is still not feature 
> complete, we should add it to the docs 
> ([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
>  to make users aware of the new API so they can start to try it out, report 
> issue and provide feedback / feature requests.
> We might still state that IQv2 is not yet feature complete, but should change 
> the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9528) Improve accessibility on website

2024-04-02 Thread Suprem Vanam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833346#comment-17833346
 ] 

Suprem Vanam commented on KAFKA-9528:
-

Hi [~Dionakra], looks like this issue is still open. Can I pick up this ticket 
and work on it?

 

If yes, could you please provide me links/resources to give me an idea of where 
to get started and proceed with the ticket?

> Improve accessibility on website
> 
>
> Key: KAFKA-9528
> URL: https://issues.apache.org/jira/browse/KAFKA-9528
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: David de los Santos Boix
>Priority: Minor
>  Labels: easyfix, newbie
> Attachments: axe-pro.png, axe.png, lighthouse.png
>
>
> If we analyse the website with some accessibility tools like Google Chrome 
> Audit for Accessibility, aXe or aXe Pro, the reports that the tools create 
> shows several flaws regarding accessibility.
>  
> Therefore, the website should comply with all the accessibility standards and 
> best practices to ensure that anyone can access the content of Kafka 
> regardless the capabilities of each individual



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548687661


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   Got it. If there is no timestamp index, we initialize 
`maxTimestampAndOffsetSoFar` to `TimestampOffset(RecordBatch.NO_TIMESTAMP, 
baseOffset())`. That's why it picks up the base offset. However, it doesn't 
seem intuitive for the user. Returning None seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]

2024-04-02 Thread via GitHub


soarez commented on PR #15409:
URL: https://github.com/apache/kafka/pull/15409#issuecomment-2033175375

   @showuon could you have a look at this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-04-02 Thread via GitHub


soarez commented on code in PR #15522:
URL: https://github.com/apache/kafka/pull/15522#discussion_r1548655446


##
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##
@@ -458,7 +458,7 @@ class ReplicaAlterLogDirsThreadTest {
   
ArgumentCaptor.forClass(classOf[org.apache.kafka.server.common.TopicIdPartition])
 val logIdCaptureT1p0: ArgumentCaptor[Uuid] = 
ArgumentCaptor.forClass(classOf[Uuid])
 
-
verify(directoryEventHandler).handleAssignment(topicIdPartitionCaptureT1p0.capture(),
 logIdCaptureT1p0.capture(), any())
+
verify(directoryEventHandler).handleAssignment(topicIdPartitionCaptureT1p0.capture(),
 logIdCaptureT1p0.capture(), "test", any())

Review Comment:
   Thanks for pointing this out. Fixed the test and also resolved the conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548654826


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   If it is fine to change the behavior, we can just retune None to build the 
response with unknown offset/timestamp.
   
   
https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/core/src/main/scala/kafka/server/KafkaApis.scala#L1146



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548644261


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   I am ok to return -1. However, it seems that we return base offset before 
when we do not find the max timestamp (no batch exists). Hence, the main reason 
of returning base offset is backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548633965


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  // return the base offset for backward compatibility if there is no 
batches
+  .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
latestTimestampSegment.baseOffset(), lpc)))

Review Comment:
   Hmm, still not sure about this. If we can't find the maxTimestamp, 
intuitively, it seems that we should return -1 for both timestamp and offset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548614429


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   oh this one (and the one below) are related to partition's offsets, not 
committed offsets, so I would say they need to stay in the PlaintextConsumer, 
where you had them (I was only suggesting to move the 
`testSubscribeAndCommitSync` here, because it relates to committed offsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548603219


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   yes, agree that's the failure we noticed on the sys tests, but conceptually 
we're creating a new `OffsetAndTimestampInternal` class that is the same as the 
existing `OffsetAndTimestamp`, with the only difference that the former does 
not throw on negative offsets or negative timestamps, right? so for the class 
doc makes sense to mention it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548599057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   I think the problem is negative timestamp in the response causing 
`org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Invalid negative timestamp`.  More specifically is this part that was 
complaining:
   ```
   if (timestamp < 0)
   throw new IllegalArgumentException("Invalid negative timestamp");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548596033


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   it's actually both! he he, so let's maybe add _negative offsets and 
timestamps_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   maybe `testFetchOffsetsForTime`, which already implies searching at a given 
timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   maybe `testFindOffsetsForTime`, which already implies searching at a given 
timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593248


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   Thanks for the explanation! Totally ok to tackle it with that separate Jira. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548591307


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   uhm...what `OffsetsAndTimestamp` does not allow is negative offsets 
[here](https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java#L35),
 and that's the requirement this new one is removing. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548585054


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() {
 return offsetResetTimestamps;
 }
 
-static Map 
buildOffsetsForTimesResult(final Map timestampsToSearch,
-   final 
Map fetchedOffsets) {
-HashMap offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
+static  Map buildListOffsetsResult(

Review Comment:
   good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548581658


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   Timestamps I assume.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548580872


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   Including the func name we're testing (`offsetsAndTimestamps`) would 
probably make the test name clearer... maybe something around 
`testOffsetsAndTimestampsTargetTimestamps`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548576809


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   We have split the consumer tests into separate files grouped by feature, and 
there is now one `PlaintextConsumerCommitTest`, I would expect this test should 
go there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548574194


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   hi @lianetm thanks for the comment. There's a ticket to align the behavior 
of the two apis per your suggestions there.  The plan is to do that in a 
separated pr. https://issues.apache.org/jira/browse/KAFKA-16433
   
   Back to your first comment, it is not immediately obvious to see why people 
use these two apis with zero timeout.  The only thing sensible thing it does to 
updating the local highwatermark as you mentioned.  I think it is worth 
addressing this ambiguity after 4.0 release. So I'll leave a comment per your 
request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548571273


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() {
 return offsetResetTimestamps;
 }
 
-static Map 
buildOffsetsForTimesResult(final Map timestampsToSearch,
-   final 
Map fetchedOffsets) {
-HashMap offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
+static  Map buildListOffsetsResult(

Review Comment:
   This generic `buildListOffsetsResult` is currently only being used from 
`buildOffsetsForTimesResult`, was the intention to used it also from 
`buildOffsetsForTimeInternalResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16466) QuorumController is swallowing some exception messages

2024-04-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16466:
-
Description: 
In some cases in QuorumController, we throw exceptions from the control manager 
methods. Unless these are explicitly caught and handled, they will eventually 
bubble up to the ControllerReadEvent/ControllerWriteEvent an hit the generic 
error handler.

In the generic error handler of QuorumController, we examine the exception to 
determine if it is a fault or not. In the case where it is not a fault, we log 
the error like:
{code:java}
 log.info("{}: {}", name, failureMessage);
{code}
which results in messages like
{code:java}
[2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: event 
failed with UnsupportedVersionException in 167 microseconds. 
(org.apache.kafka.controller.QuorumController:544)
{code}
In this case, the exception actually has more details in its own message
{code:java}
Unable to register because the broker does not support version 8 of 
metadata.version. It wants a version between 20 and 20, inclusive.
{code}

We should include the exception's message in the log output for non-fault 
errors as it includes very useful debugging info.

This was found while writing an integration test for KRaft migration where the 
brokers and controllers have a mismatched MetadataVersion.

  was:
In some cases in QuorumController, we throw exceptions from the control manager 
methods. Unless these are explicitly caught and handled, they will eventually 
bubble up to the ControllerReadEvent/ControllerWriteEvent an hit the generic 
error handler.

In the generic error handler of QuorumController, we examine the exception to 
determine if it is a fault or not. In the case where it is not a fault, we log 
the error like:
{code:java}
 log.info("{}: {}", name, failureMessage);
{code}
which results in messages like
{code:java}
[2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: event 
failed with UnsupportedVersionException in 167 microseconds. 
(org.apache.kafka.controller.QuorumController:544)
{code}
In this case, the exception actually has more details in its own message
{code:java}
Unable to register because the broker does not support version 8 of 
metadata.version. It wants a version between 20 and 20, inclusive.
{code}

This was found while writing an integration test for KRaft migration where the 
brokers and controllers have a mismatched MetadataVersion.


> QuorumController is swallowing some exception messages
> --
>
> Key: KAFKA-16466
> URL: https://issues.apache.org/jira/browse/KAFKA-16466
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.7.0
>Reporter: David Arthur
>Priority: Major
>  Labels: good-first-issue
> Fix For: 3.8.0, 3.7.1
>
>
> In some cases in QuorumController, we throw exceptions from the control 
> manager methods. Unless these are explicitly caught and handled, they will 
> eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit 
> the generic error handler.
> In the generic error handler of QuorumController, we examine the exception to 
> determine if it is a fault or not. In the case where it is not a fault, we 
> log the error like:
> {code:java}
>  log.info("{}: {}", name, failureMessage);
> {code}
> which results in messages like
> {code:java}
> [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: 
> event failed with UnsupportedVersionException in 167 microseconds. 
> (org.apache.kafka.controller.QuorumController:544)
> {code}
> In this case, the exception actually has more details in its own message
> {code:java}
> Unable to register because the broker does not support version 8 of 
> metadata.version. It wants a version between 20 and 20, inclusive.
> {code}
> We should include the exception's message in the log output for non-fault 
> errors as it includes very useful debugging info.
> This was found while writing an integration test for KRaft migration where 
> the brokers and controllers have a mismatched MetadataVersion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16427) KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER

2024-04-02 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833307#comment-17833307
 ] 

Kirk True commented on KAFKA-16427:
---

[~alyssahuang]—any chance you have a test case to trigger this, such as an 
integration or system test? Thanks!

> KafkaConsumer#position() does not respect timeout when group protocol is 
> CONSUMER
> -
>
> Key: KAFKA-16427
> URL: https://issues.apache.org/jira/browse/KAFKA-16427
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> When
> `long position(TopicPartition partition, final Duration timeout);`
> is called on an unknown topic partition (and auto creation is disabled), the 
> method fails to adhere to the timeout supplied.
> e.g. the following warning is logged continuously as metadata fetches are 
> retried 
> [2024-03-26 11:03:48,589] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Error while fetching metadata with correlation id 200 : 
> \{nonexistingTopic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient:1313)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16466) QuorumController is swallowing some exception messages

2024-04-02 Thread David Arthur (Jira)
David Arthur created KAFKA-16466:


 Summary: QuorumController is swallowing some exception messages
 Key: KAFKA-16466
 URL: https://issues.apache.org/jira/browse/KAFKA-16466
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 3.7.0
Reporter: David Arthur
 Fix For: 3.8.0, 3.7.1


In some cases in QuorumController, we throw exceptions from the control manager 
methods. Unless these are explicitly caught and handled, they will eventually 
bubble up to the ControllerReadEvent/ControllerWriteEvent an hit the generic 
error handler.

In the generic error handler of QuorumController, we examine the exception to 
determine if it is a fault or not. In the case where it is not a fault, we log 
the error like:
{code:java}
 log.info("{}: {}", name, failureMessage);
{code}
which results in messages like
{code:java}
[2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: event 
failed with UnsupportedVersionException in 167 microseconds. 
(org.apache.kafka.controller.QuorumController:544)
{code}
In this case, the exception actually has more details in its own message
{code:java}
Unable to register because the broker does not support version 8 of 
metadata.version. It wants a version between 20 and 20, inclusive.
{code}

This was found while writing an integration test for KRaft migration where the 
brokers and controllers have a mismatched MetadataVersion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16465:
-

Assignee: Lianet Magrans  (was: Kirk True)

> New consumer does not invoke rebalance callbacks as expected in 
> consumer_test.py system test
> 
>
> Key: KAFKA-16465
> URL: https://issues.apache.org/jira/browse/KAFKA-16465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 29.511 seconds
> AssertionError('Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 254, in test_static_consumer_bounce
> (num_revokes_after_bounce, check_condition)
> AssertionError: Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16459) New consumer times out joining group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16459:
-

Assignee: Lianet Magrans  (was: Kirk True)

> New consumer times out joining group in consumer_test.py system test
> 
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548554563


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   I would add : Internal representation of {@link OffsetAndTimestamp} **that 
allows negative offsets**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   so if I get it right we are intentionally leaving this? generating an event 
to get offsets, when in the end we return right away without waiting for a 
response? I do get that the old consumer does it, and I could be missing the 
purpose of it, but seems to me an unneeded request, even considering the side 
effect of the onSuccess handler. The handler just updates the positions to 
reuse the offsets it just retrieved, and it does make sense to reuse the result 
when we do need to make a request, but I wouldn't say we need to generate an 
unneeded event/request just for that when the user requested offsets with 
max-time-to-wait=0. 
   
   In any case, if we prefer to keep this, I would suggest 2 things:
   
   1. to add a comment explaining why (handler), because it looks like a weird 
overhead to add the event and return, 
   2. to be consistent and generate the event also in the case of the 
`offsetsForTimes` before the early return (ln 1104). In the case of the old 
consumer, it's a common logic so both path, `offsetsForTimes` and 
`beginning/endOffsets` do the same request+return



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   so if I get it right we are intentionally leaving this? generating an event 
to get offsets, when in the end we return right away without waiting for a 
response? I do get that the old consumer does it, and I could be missing the 
purpose of it, but seems to me an unneeded request, even considering the side 
effect of the onSuccess handler. The handler just updates the positions to 
reuse the offsets it just retrieved, and it does make sense to reuse the result 
when we do need to make a request, but I wouldn't say we need to generate an 
unneeded event/request just for that when the user requested offsets with 
max-time-to-wait=0. 
   
   In any case, if we prefer to keep this, I would suggest 2 things:
   
   1. to add a comment explaining why (handler), because it looks like a weird 
overhead to add the event and return, 2. to be consistent and generate the 
event also in the case of the `offsetsForTimes` before the early return (ln 
1104). In the case of the old consumer, it's a common logic so both path, 
`offsetsForTimes` and `beginning/endOffsets` do the same request+return



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548530489


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   > why do we need to return offset=0 when we can't find the maxTimestamp?
   
   oh, my bad. we should return base offset instead of zero. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


cmccabe commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548528194


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   yes, we need to make sure not to delete the whole metadata directory  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Benchmark fibers [kafka]

2024-04-02 Thread via GitHub


bgprudhomme opened a new pull request, #15649:
URL: https://github.com/apache/kafka/pull/15649

   Refactor to fibers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Benchmark fibers [kafka]

2024-04-02 Thread via GitHub


bgprudhomme closed pull request #15649: Benchmark fibers
URL: https://github.com/apache/kafka/pull/15649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548499611


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Thanks @jsancio, I'll fix this and add some additional test cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-02 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833298#comment-17833298
 ] 

Kirk True commented on KAFKA-16389:
---

Can you mark this as "In progress", [~pnee]? Thanks!

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548487542


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   Hmm, why do we need to return offset=0 when we can't find the maxTimestamp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


jsancio commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548472966


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   This deletes the entire metadata log directory and not the 
`__cluster_metadata-0` topic partition in the metadata log dir. In some 
configuration the `metadata.log.dir` equals the `log.dir(s)`. In those 
configuration this will delete all of the topic partitions in the log directory.
   
   If the test pass, this means that we are missing a test that checks this 
doesn't happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16405:
--
Fix Version/s: 3.8.0

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   

[jira] [Updated] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16465:
--
Description: 
The {{consumer_test.py}} system test fails with the following error:

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 29.511 seconds


AssertionError('Total revoked count 0 does not match the expectation of 
having 0 revokes as 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 254, in test_static_consumer_bounce
(num_revokes_after_bounce, check_condition)
AssertionError: Total revoked count 0 does not match the expectation of having 
0 revokes as 0
{noformat}

  was:
The {{replication_replica_failure_test.py}} system test fails with the 
following error:

{noformat}
test_id:
kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 20.972 seconds


TimeoutError('Timed out after 30s while awaiting initial record delivery of 
5 records')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py",
 line 97, in test_replication_with_replica_failure
self.await_startup()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py",
 line 125, in await_startup
(timeout_sec, min_records))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial record 
delivery of 5 records
{noformat}


> New consumer does not invoke rebalance callbacks as expected in 
> consumer_test.py system test
> 
>
> Key: KAFKA-16465
> URL: https://issues.apache.org/jira/browse/KAFKA-16465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 29.511 seconds
> AssertionError('Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> 

[jira] [Created] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16465:
-

 Summary: New consumer does not invoke rebalance callbacks as 
expected in consumer_test.py system test
 Key: KAFKA-16465
 URL: https://issues.apache.org/jira/browse/KAFKA-16465
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{replication_replica_failure_test.py}} system test fails with the 
following error:

{noformat}
test_id:
kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 20.972 seconds


TimeoutError('Timed out after 30s while awaiting initial record delivery of 
5 records')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py",
 line 97, in test_replication_with_replica_failure
self.await_startup()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py",
 line 125, in await_startup
(timeout_sec, min_records))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial record 
delivery of 5 records
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16464) New consumer fails with timeout in replication_replica_failure_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16464:
--
Description: 
The {{replication_replica_failure_test.py}} system test fails with the 
following error:

{noformat}
test_id:
kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 20.972 seconds


TimeoutError('Timed out after 30s while awaiting initial record delivery of 
5 records')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py",
 line 97, in test_replication_with_replica_failure
self.await_startup()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py",
 line 125, in await_startup
(timeout_sec, min_records))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial record 
delivery of 5 records
{noformat}

  was:
The {{security_test.py}} system test fails with the following error:

{noformat}
test_id:
kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 30.885 seconds


TimeoutError('')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
 line 142, in test_client_ssl_endpoint_validation_failure
wait_until(lambda: self.producer_consumer_have_expected_error(error), 
timeout_sec=30)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError
{noformat}


> New consumer fails with timeout in replication_replica_failure_test.py system 
> test
> --
>
> Key: KAFKA-16464
> URL: https://issues.apache.org/jira/browse/KAFKA-16464
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{replication_replica_failure_test.py}} system test fails with the 
> following error:
> {noformat}
> test_id:
> kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 20.972 seconds
> TimeoutError('Timed out after 30s while awaiting initial record delivery 
> of 5 records')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = 

[jira] [Created] (KAFKA-16464) New consumer fails with timeout in replication_replica_failure_test.py system test

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16464:
-

 Summary: New consumer fails with timeout in 
replication_replica_failure_test.py system test
 Key: KAFKA-16464
 URL: https://issues.apache.org/jira/browse/KAFKA-16464
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{security_test.py}} system test fails with the following error:

{noformat}
test_id:
kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 30.885 seconds


TimeoutError('')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
 line 142, in test_client_ssl_endpoint_validation_failure
wait_until(lambda: self.producer_consumer_have_expected_error(error), 
timeout_sec=30)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548438900


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+Some(latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
+  .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new 
TimestampAndOffset(batch.maxTimestamp(), _, lpc)))
+  .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return 
something for backward compatibility

Review Comment:
   @showuon your previous comment is right (sorry that I can't find the comment 
but it is in my mind). We need to return `offset=0 and ts=-1` if there are no 
batches for the sake of backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16405:
--
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data 

[jira] [Updated] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16405:
--
Component/s: clients

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> 

[jira] [Updated] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16405:
--
Priority: Blocker  (was: Major)

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> 

[jira] [Updated] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16405:
--
Issue Type: Bug  (was: Task)

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> 

Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548427028


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,54 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
- *
- * @return The max timestamp and its offset
+ * There are three cases of finding max timestamp to return:
+ * 1) version 0: The max timestamp is NO_TIMESTAMP (-1)
+ * 2) LogAppendTime: All records have same timestamp, and so the max 
timestamp is equal to logAppendTime
+ * 3) CreateTime: The max timestamp of record
+ * 
+ * Let's talk about OffsetOfMaxTimestamp. There are some paths that we 
don't try to find the OffsetOfMaxTimestamp
+ * to avoid expensive records iteration. Those paths include follower 
append and index recovery. In order to
+ * avoid inconsistent time index, we let all paths find 
shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp.
+ * 
+ * Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the 
batch having max timestamp. If there are
+ * many batches having same max timestamp, we pick up the earliest batch.
+ * 
+ * There are five cases of finding shallowOffsetOfMaxTimestamp to return:
+ * 1) version 0: It is always the -1
+ * 2) LogAppendTime with single batch: It is the offset of last record
+ * 3) LogAppendTime with many single-record batches: Those single-record 
batches have same max timestamp, so the
+ *   base offset is equal 
with the last offset of earliest batch

Review Comment:
   so the base offset is equal with the last offset of earliest batch => so we 
return the base offset, which is equal to the last offset of earliest batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-02 Thread via GitHub


mumrah opened a new pull request, #15648:
URL: https://github.com/apache/kafka/pull/15648

   This patch changes the behavior of the migrating ZK broker to always delete 
the local metadata log during startup. This deletion is done immediately before 
creating the RaftManager which will re-create the log directory and let the 
broker re-replicate the log from the active controller.
   
   The rationale for this change is to make it easier for operators to 
re-attempt a ZK to KRaft migration after having reverted back to ZK mode. If an 
operator has reverted back to ZK mode, there will be an invalid metadata log on 
the disk of each broker. In order to re-attempt the migration in the future, 
this log needs to be deleted. This can be pretty burdensome to the operator for 
large clusters, especially since the log deletion must be done while the broker 
is offline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548390558


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,54 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
- *
- * @return The max timestamp and its offset
+ * There are three cases of finding max timestamp to return:

Review Comment:
   hi @junrao I rewrite whole comments to list all cases. please take a look at 
it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16463) Automatically delete metadata log directory on ZK brokers

2024-04-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16463:
-
Description: 
Throughout the process of a ZK to KRaft migration, the operator has the choice 
to revert back to ZK mode. Once this is done, there will be a copy of the 
metadata log on each broker in the cluster.

In order to re-attempt the migration in the future, this metadata log needs to 
be deleted. This can be pretty burdensome to the operator for large clusters, 
especially since the log deletion must be done while the broker is offline.

To improve this, we can automatically delete any metadata log present during 
startup of a ZK broker. In general, it is always safe to remove the metadata 
log from a KRaft or migrating ZK broker. The main impact is that this will 
delay the time it takes for the broker to be unfenced by the controller since 
it has to re-replicate the log. In the case of hybrid mode ZK brokers, there 
will be a delay in them receiving their first UpdateMetadataRequest from the 
controller (for the same reason -- delay in getting unfenced).

The delayed startup should not affect the performance of the cluster, though it 
would increase the overall time required to do a rolling restart of the 
cluster. 

Once a broker restarts as KRaft, we will stop doing this automatic deletion. 

  was:
Throughout the process of a ZK to KRaft migration, the operator has the choice 
to revert back to ZK mode. Once this is done, there will be a copy of the 
metadata log on each broker in the cluster.

In order to re-attempt the migration in the future, this metadata log needs to 
be deleted. This can be pretty burdensome to the operator for large clusters. 

To improve this, we can automatically delete any metadata log present during 
startup of a ZK broker. This is safe to do because the ZK broker will just 
re-replicate the metadata log from the active controller. Once a broker 
restarts as KRaft, it will stop doing this automatic deletion. 


> Automatically delete metadata log directory on ZK brokers
> -
>
> Key: KAFKA-16463
> URL: https://issues.apache.org/jira/browse/KAFKA-16463
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
> Fix For: 3.8.0
>
>
> Throughout the process of a ZK to KRaft migration, the operator has the 
> choice to revert back to ZK mode. Once this is done, there will be a copy of 
> the metadata log on each broker in the cluster.
> In order to re-attempt the migration in the future, this metadata log needs 
> to be deleted. This can be pretty burdensome to the operator for large 
> clusters, especially since the log deletion must be done while the broker is 
> offline.
> To improve this, we can automatically delete any metadata log present during 
> startup of a ZK broker. In general, it is always safe to remove the metadata 
> log from a KRaft or migrating ZK broker. The main impact is that this will 
> delay the time it takes for the broker to be unfenced by the controller since 
> it has to re-replicate the log. In the case of hybrid mode ZK brokers, there 
> will be a delay in them receiving their first UpdateMetadataRequest from the 
> controller (for the same reason -- delay in getting unfenced).
> The delayed startup should not affect the performance of the cluster, though 
> it would increase the overall time required to do a rolling restart of the 
> cluster. 
> Once a broker restarts as KRaft, we will stop doing this automatic deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16463) Automatically delete metadata log directory on ZK brokers

2024-04-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16463:
-
Description: 
Throughout the process of a ZK to KRaft migration, the operator has the choice 
to revert back to ZK mode. Once this is done, there will be a copy of the 
metadata log on each broker in the cluster.

In order to re-attempt the migration in the future, this metadata log needs to 
be deleted. This can be pretty burdensome to the operator for large clusters. 

To improve this, we can automatically delete any metadata log present during 
startup of a ZK broker. This is safe to do because the ZK broker will just 
re-replicate the metadata log from the active controller. Once a broker 
restarts as KRaft, it will stop doing this automatic deletion. 

  was:
Throughout the process of a ZK to KRaft migration, the operator has the choice 
to revert back to ZK mode. Once this is done, there will be a copy of the 
metadata log on each broker in the cluster.

In order to re-attempt the migration in the future, this metadata log needs to 
be deleted. This can be pretty burdensome to the operator for large clusters. 

To improve this, we can automatically delete any metadata log present during 
startup of a ZK broker. This is safe to do because the ZK broker will just 
re-replicate the metadata log from the active controller.


> Automatically delete metadata log directory on ZK brokers
> -
>
> Key: KAFKA-16463
> URL: https://issues.apache.org/jira/browse/KAFKA-16463
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
> Fix For: 3.8.0
>
>
> Throughout the process of a ZK to KRaft migration, the operator has the 
> choice to revert back to ZK mode. Once this is done, there will be a copy of 
> the metadata log on each broker in the cluster.
> In order to re-attempt the migration in the future, this metadata log needs 
> to be deleted. This can be pretty burdensome to the operator for large 
> clusters. 
> To improve this, we can automatically delete any metadata log present during 
> startup of a ZK broker. This is safe to do because the ZK broker will just 
> re-replicate the metadata log from the active controller. Once a broker 
> restarts as KRaft, it will stop doing this automatic deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16463) Automatically delete metadata log directory on ZK brokers

2024-04-02 Thread David Arthur (Jira)
David Arthur created KAFKA-16463:


 Summary: Automatically delete metadata log directory on ZK brokers
 Key: KAFKA-16463
 URL: https://issues.apache.org/jira/browse/KAFKA-16463
 Project: Kafka
  Issue Type: Improvement
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 3.8.0


Throughout the process of a ZK to KRaft migration, the operator has the choice 
to revert back to ZK mode. Once this is done, there will be a copy of the 
metadata log on each broker in the cluster.

In order to re-attempt the migration in the future, this metadata log needs to 
be deleted. This can be pretty burdensome to the operator for large clusters. 

To improve this, we can automatically delete any metadata log present during 
startup of a ZK broker. This is safe to do because the ZK broker will just 
re-replicate the metadata log from the active controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r154830


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be -1.

Review Comment:
   Still not very accurate. For message format version 0, the offset will be 
-1. For message format version 1, the offset will be the first offset.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be -1.
+ * 
+ * If create time is used, the offset will be the last offset unless no 
compression is used and the message
+ * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.

Review Comment:
   For message format version 0, the offset will be -1. For message format 
version 1, the offset will be the offset of the record with the max timestamp.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16462) New consumer fails with timeout in security_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16462:
--
Description: 
The {{security_test.py}} system test fails with the following error:

{noformat}
test_id:
kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 30.885 seconds


TimeoutError('')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
 line 142, in test_client_ssl_endpoint_validation_failure
wait_until(lambda: self.producer_consumer_have_expected_error(error), 
timeout_sec=30)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError
{noformat}

  was:
The {{security_test.py}} system test fails with the following error:

{quote}
* Consumer failed to consume up to offsets
{quote}

Affected test:

* {{test_client_ssl_endpoint_validation_failure}}


> New consumer fails with timeout in security_test.py system test
> ---
>
> Key: KAFKA-16462
> URL: https://issues.apache.org/jira/browse/KAFKA-16462
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 30.885 seconds
> TimeoutError('')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
>  line 142, in test_client_ssl_endpoint_validation_failure
> wait_until(lambda: self.producer_consumer_have_expected_error(error), 
> timeout_sec=30)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16462) New consumer fails with timeout in security_test.py system test

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16462:
-

 Summary: New consumer fails with timeout in security_test.py 
system test
 Key: KAFKA-16462
 URL: https://issues.apache.org/jira/browse/KAFKA-16462
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{security_test.py}} system test fails with the following error:

{quote}
* Consumer failed to consume up to offsets
{quote}

Affected test:

* {{test_client_ssl_endpoint_validation_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16461) New consumer fails to consume records in security_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16461:
--
Description: 
The {{security_test.py}} system test fails with the following error:

{quote}
* Consumer failed to consume up to offsets
{quote}

Affected test:

* {{test_client_ssl_endpoint_validation_failure}}

  was:
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}


> New consumer fails to consume records in security_test.py system test
> -
>
> Key: KAFKA-16461
> URL: https://issues.apache.org/jira/browse/KAFKA-16461
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {quote}
> * Consumer failed to consume up to offsets
> {quote}
> Affected test:
> * {{test_client_ssl_endpoint_validation_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16461) New consumer fails to consume records in security_test.py system test

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16461:
-

 Summary: New consumer fails to consume records in security_test.py 
system test
 Key: KAFKA-16461
 URL: https://issues.apache.org/jira/browse/KAFKA-16461
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Description: 
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}

  was:
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
* {quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}


> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Description: 
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
* {quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}

  was:
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}


> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> * {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Description: 
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}

  was:
The {{consumer_test.py}} system test fails with two different errors related to 
consumption in a timely fashion.

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:
 * {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}


> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Description: 
The {{consumer_test.py}} system test fails with two different errors related to 
consumption in a timely fashion.

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:
 * {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}

  was:
The {{consumer_test.py}} system test fails with two different errors related to 
consumers joining the consumer group in a timely fashion.
{quote} * Timed out waiting for consumption
 * Timed out waiting for consumers to join, expected total X joined, but only 
see Y joined fromnormal consumer group and Z from conflict consumer group{quote}
Affected tests:
 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}


> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumption in a timely fashion.
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
>  * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Description: 
The {{consumer_test.py}} system test fails with two different errors related to 
consumers joining the consumer group in a timely fashion.
{quote} * Timed out waiting for consumption
 * Timed out waiting for consumers to join, expected total X joined, but only 
see Y joined fromnormal consumer group and Z from conflict consumer group{quote}
Affected tests:
 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}

  was:
The {{consumer_test.py}} system test fails with two different errors related to 
consumers joining the consumer group in a timely fashion.

{quote}
* Consumers failed to join in a reasonable amount of time
* Timed out waiting for consumers to join, expected total X joined, but only 
see Y joined fromnormal consumer group and Z from conflict consumer group{quote}

Affected tests:

 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}


> New consumer times out system test
> --
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote} * Timed out waiting for consumption
>  * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16459) New consumer times out join group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16459:
-

Assignee: Kirk True  (was: Philip Nee)

> New consumer times out join group in consumer_test.py system test
> -
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16459) New consumer times out joining group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Summary: New consumer times out joining group in consumer_test.py system 
test  (was: New consumer times out join group in consumer_test.py system test)

> New consumer times out joining group in consumer_test.py system test
> 
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16460) New consumer times out consuming records in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16460:
--
Summary: New consumer times out consuming records in consumer_test.py 
system test  (was: New consumer times out system test)

> New consumer times out consuming records in consumer_test.py system test
> 
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote} * Timed out waiting for consumption
>  * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16460) New consumer times out system test

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16460:
-

 Summary: New consumer times out system test
 Key: KAFKA-16460
 URL: https://issues.apache.org/jira/browse/KAFKA-16460
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test fails with two different errors related to 
consumers joining the consumer group in a timely fashion.

{quote}
* Consumers failed to join in a reasonable amount of time
* Timed out waiting for consumers to join, expected total X joined, but only 
see Y joined fromnormal consumer group and Z from conflict consumer group{quote}

Affected tests:

 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2032614122

   @junrao thanks for additional reviews. I have addressed them except 
https://github.com/apache/kafka/pull/15621#discussion_r1548230600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16459) New consumer times out join group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Description: 
The {{consumer_test.py}} system test fails with two different errors related to 
consumers joining the consumer group in a timely fashion.

{quote}
* Consumers failed to join in a reasonable amount of time
* Timed out waiting for consumers to join, expected total X joined, but only 
see Y joined fromnormal consumer group and Z from conflict consumer group{quote}

Affected tests:

 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}

  was:
The {{consumer_test.py}} system test fails with errors related to consumers 
joining the consumer group in a timely fashion.

Occurrences of {{Consumers failed to join in a reasonable amount of time}}:

 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}

Occurrences of {{Timed out waiting for consumers to join, expected total X 
joined, but only see Y joined fromnormal consumer group and Z from conflict 
consumer group}}:

 * {{test_fencing_static_consumer}}


> New consumer times out join group in consumer_test.py system test
> -
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16459) New consumer times out join group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Description: 
The {{consumer_test.py}} system test fails with errors related to consumers 
joining the consumer group in a timely fashion.

Occurrences of {{Consumers failed to join in a reasonable amount of time}}:

 * {{test_fencing_static_consumer}}
 * {{test_static_consumer_bounce}}
 * {{test_static_consumer_persisted_after_rejoin}}

Occurrences of {{Timed out waiting for consumers to join, expected total X 
joined, but only see Y joined fromnormal consumer group and Z from conflict 
consumer group}}:

 * {{test_fencing_static_consumer}}

  was:
{{Consumers failed to join in a reasonable amount of time}} occurs in the 
following tests when {{use_new_coordinator}} is {{true}} and {{group_protocol}} 
is {{consumer}}:

* {{consumer_test.py}}, {{test_fencing_static_consumer}}
* {{consumer_test.py}}, {{test_static_consumer_bounce}}
* {{consumer_test.py}}, {{test_static_consumer_persisted_after_rejoin}}







> New consumer times out join group in consumer_test.py system test
> -
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with errors related to consumers 
> joining the consumer group in a timely fashion.
> Occurrences of {{Consumers failed to join in a reasonable amount of time}}:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}
> Occurrences of {{Timed out waiting for consumers to join, expected total X 
> joined, but only see Y joined fromnormal consumer group and Z from conflict 
> consumer group}}:
>  * {{test_fencing_static_consumer}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16459) New consumer times out join group in consumer_test.py system test

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Summary: New consumer times out join group in consumer_test.py system test  
(was: New consumer fails to join group in system tests)

> New consumer times out join group in consumer_test.py system test
> -
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> {{Consumers failed to join in a reasonable amount of time}} occurs in the 
> following tests when {{use_new_coordinator}} is {{true}} and 
> {{group_protocol}} is {{consumer}}:
> * {{consumer_test.py}}, {{test_fencing_static_consumer}}
> * {{consumer_test.py}}, {{test_static_consumer_bounce}}
> * {{consumer_test.py}}, {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16459) New consumer fails to join group in system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Description: 
{{Consumers failed to join in a reasonable amount of time}} occurs in the 
following tests when {{use_new_coordinator}} is {{true}} and {{group_protocol}} 
is {{consumer}}:

* {{consumer_test.py}}, {{test_fencing_static_consumer}}
* {{consumer_test.py}}, {{test_static_consumer_bounce}}
* {{consumer_test.py}}, {{test_static_consumer_persisted_after_rejoin}}






  was:
System tests failures:

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 39.742 seconds


TimeoutError('Timed out waiting for consumers to join, expected total 3 
joined, but only see 0 joined fromnormal consumer group and 0 from conflict 
consumer group')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 366, in test_fencing_static_consumer
(len(consumer.nodes), len(consumer.joined_nodes()), 
len(conflict_consumer.joined_nodes()))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Timed out waiting for consumers to join, expected 
total 3 joined, but only see 0 joined fromnormal consumer group and 0 from 
conflict consumer group
{noformat}

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   3 minutes 7.629 seconds


TimeoutError('Consumers failed to join in a reasonable amount of time')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 352, in test_fencing_static_consumer
self.await_members(conflict_consumer, num_conflict_consumers)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/verifiable_consumer_test.py",
 line 85, in await_members
err_msg="Consumers failed to join in a reasonable amount of time")
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Consumers failed to join in a reasonable amount 
of time
{noformat}

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   3 minutes 6.691 seconds


TimeoutError('Consumers failed to join in a reasonable amount of time')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return 

[jira] [Updated] (KAFKA-16459) New consumer fails to join group in system tests

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Summary: New consumer fails to join group in system tests  (was: 
consumer_test.py’s static membership tests fail with new consumer)

> New consumer fails to join group in system tests
> 
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> System tests failures:
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 39.742 seconds
> TimeoutError('Timed out waiting for consumers to join, expected total 3 
> joined, but only see 0 joined fromnormal consumer group and 0 from conflict 
> consumer group')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 366, in test_fencing_static_consumer
> (len(consumer.nodes), len(consumer.joined_nodes()), 
> len(conflict_consumer.joined_nodes()))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Timed out waiting for consumers to join, 
> expected total 3 joined, but only see 0 joined fromnormal consumer group and 
> 0 from conflict consumer group
> {noformat}
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   3 minutes 7.629 seconds
> TimeoutError('Consumers failed to join in a reasonable amount of time')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 352, in test_fencing_static_consumer
> self.await_members(conflict_consumer, num_conflict_consumers)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/verifiable_consumer_test.py",
>  line 85, in await_members
> err_msg="Consumers failed to join in a reasonable amount of time")
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Consumers failed to join in a reasonable amount 
> of time
> {noformat}
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   3 minutes 6.691 seconds
> TimeoutError('Consumers failed 

[jira] [Updated] (KAFKA-16459) consumer_test.py’s static membership tests fail with new consumer

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Description: 
System tests failures:

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   1 minute 39.742 seconds


TimeoutError('Timed out waiting for consumers to join, expected total 3 
joined, but only see 0 joined fromnormal consumer group and 0 from conflict 
consumer group')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 366, in test_fencing_static_consumer
(len(consumer.nodes), len(consumer.joined_nodes()), 
len(conflict_consumer.joined_nodes()))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Timed out waiting for consumers to join, expected 
total 3 joined, but only see 0 joined fromnormal consumer group and 0 from 
conflict consumer group
{noformat}

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   3 minutes 7.629 seconds


TimeoutError('Consumers failed to join in a reasonable amount of time')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 352, in test_fencing_static_consumer
self.await_members(conflict_consumer, num_conflict_consumers)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/verifiable_consumer_test.py",
 line 85, in await_members
err_msg="Consumers failed to join in a reasonable amount of time")
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Consumers failed to join in a reasonable amount 
of time
{noformat}

{noformat}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   3 minutes 6.691 seconds


TimeoutError('Consumers failed to join in a reasonable amount of time')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 

Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548258165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -434,7 +442,8 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 now,
 records,
 maxTimestamp,
-offsetOfMaxTimestamp,
+// there is only one batch in this path, so last offset can be 
viewed as shallowOffsetOfMaxTimestamp
+lastOffset,

Review Comment:
   If the magic is 0, we don't reach this path. Instead, we run 
`buildRecordsAndAssignOffsets` to handle the version 0. see 
https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L343



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16459) consumer_test.py’s static membership tests fail with new consumer

2024-04-02 Thread Kirk True (Jira)
Kirk True created KAFKA-16459:
-

 Summary: consumer_test.py’s static membership tests fail with new 
consumer
 Key: KAFKA-16459
 URL: https://issues.apache.org/jira/browse/KAFKA-16459
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Philip Nee
 Fix For: 3.8.0


The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16459) consumer_test.py’s static membership tests fail with new consumer

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16459:
--
Description: (was: The following error is reported when running the 
{{test_valid_assignment}} test from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.)

> consumer_test.py’s static membership tests fail with new consumer
> -
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-02 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1548211287


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.

Review Comment:
   which having => having



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,14 +293,29 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
+// those checks should be equal to MemoryRecordsBuilder#info
+switch (toMagic) {
+case RecordBatch.MAGIC_VALUE_V0:
+// value will be the default value: -1
+shallowOffsetOfMaxTimestamp = -1;

Review Comment:
   maxTimestamp should be NO_TIMESTAMP if magic is 0.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -434,7 +442,8 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 now,
 records,
 maxTimestamp,
-offsetOfMaxTimestamp,
+// there is only one batch in this path, so last offset can be 
viewed as shallowOffsetOfMaxTimestamp
+lastOffset,

Review Comment:
   If magic is 0, we should set both maxTimestamp and 
shallowOffsetOfMaxTimestamp to -1.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -240,25 +240,40 @@ public MemoryRecords build() {
 return builtRecords;
 }
 
+
 /**
- * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
- * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
- *
- * If the log append time is used, the offset will be the first offset of 
the record.
- *
- * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
- *
- * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+ * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+ * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.
+ * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+ * 
+ * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+ * the message format version is 0 or 1, in which case, it will be the 
first offset.

Review Comment:
   For message format 0, offset is always -1. Ditto below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-04-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16272:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15615: Improve handling of fetching during metadata updates [kafka]

2024-04-02 Thread via GitHub


appchemist opened a new pull request, #15647:
URL: https://github.com/apache/kafka/pull/15647

   - add 'AWAIT_UPDATE' state in FetchStates
 - This state makes the subscription unfetchable during metadata updates.
 - After a metadata update, the state becomes 'AWAIT_VALIDATION' or 
'FETCHING' depending on the nodeApiVersion and metadata.
   - For KIP-951, if the fetch error response includes CurrentLeader 
information, the subscription remains fetchable as is.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]

2024-04-02 Thread via GitHub


johnnychhsu commented on PR #15556:
URL: https://github.com/apache/kafka/pull/15556#issuecomment-2032431618

   @vamossagar12 thanks for sharing!
   and from the build the IdentityReplicationIntegrationTest 
.testReplicateFromLatest passed with the fix, I think this could be the root 
cause, @chia7712 @vamossagar12 wdyt?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-02 Thread via GitHub


chia7712 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2032377734

   > We should prioritize https://issues.apache.org/jira/browse/KAFKA-6527 to 
re-enable this test, as it would have caught this regression.
   
   sounds good to me. just test it on my local, and you are right. Do you plan 
to take over KAFKA-6527?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >