Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1533239354


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -182,14 +187,19 @@ public void testTopicPartitionsArg() {
 setUp();
 
 List offsets = executeAndParse("--topic-partitions", 
"topic1:0,topic2:1,topic(3|4):2,__.*:3");
-List expected = Arrays.asList(
-new Row("__consumer_offsets", 3, 0L),
+ArrayList expected = new ArrayList<>(
+Arrays.asList(
 new Row("topic1", 0, 1L),
 new Row("topic2", 1, 2L),
 new Row("topic3", 2, 3L),
 new Row("topic4", 2, 4L)
+)
 );
 
+if (!cluster.isKRaftTest()) {

Review Comment:
   I feel this test needs to verify the match pattern `__.*:3`, so we can do 
read for KRaft mode to create the internal topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1533209761


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) {
 this.timestamp = timestamp;
 }
 
+@Override
+public String toString() {
+return "Row[name:" + name + ",partition:" + partition + 
",timestamp:" + timestamp;

Review Comment:
   Is it used for debugging?



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -338,6 +348,10 @@ private void assertExitCodeIsOne(String... args) {
 }
 
 private List expectedOffsetsWithInternal() {
+if (cluster.isKRaftTest()) {

Review Comment:
   I prefer to make callers use `expectedTestTopicOffsets` instead of adding 
if-else here. 



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -182,14 +187,19 @@ public void testTopicPartitionsArg() {
 setUp();
 
 List offsets = executeAndParse("--topic-partitions", 
"topic1:0,topic2:1,topic(3|4):2,__.*:3");
-List expected = Arrays.asList(
-new Row("__consumer_offsets", 3, 0L),
+ArrayList expected = new ArrayList<>(

Review Comment:
   It seems we can keep using `List`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use --no-daemon when building with Jenkins [kafka]

2024-03-20 Thread via GitHub


github-actions[bot] commented on PR #15057:
URL: https://github.com/apache/kafka/pull/15057#issuecomment-2011157584

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]

2024-03-20 Thread via GitHub


showuon commented on code in PR #15568:
URL: https://github.com/apache/kafka/pull/15568#discussion_r1533195347


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -1303,6 +1303,27 @@ class LogManagerTest {
 createLeaderAndIsrRequestForStrayDetection(present),
 onDisk.map(mockLog(_))).toSet)
   }
+
+  @Test
+  def testLock(): Unit = {
+val tmpLogDir = TestUtils.tempDir()
+val tmpLogManager = createLogManager(Seq(tmpLogDir))
+tmpLogManager.startup(Set.empty)
+
+// ${tmpLogDir}.lock is acquired by tmpLogManager
+val fileLock = new FileLock(new File(tmpLogDir, ".lock"))

Review Comment:
   We should use `LogManager.LockFileName` instead.



##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -1303,6 +1303,27 @@ class LogManagerTest {
 createLeaderAndIsrRequestForStrayDetection(present),
 onDisk.map(mockLog(_))).toSet)
   }
+
+  @Test
+  def testLock(): Unit = {
+val tmpLogDir = TestUtils.tempDir()
+val tmpLogManager = createLogManager(Seq(tmpLogDir))
+tmpLogManager.startup(Set.empty)
+
+// ${tmpLogDir}.lock is acquired by tmpLogManager
+val fileLock = new FileLock(new File(tmpLogDir, ".lock"))
+try {
+  assertFalse(fileLock.tryLock())
+} finally {
+  fileLock.destroy()
+}
+
+// ${tmpLogDir}.lock is removed after shutdown
+tmpLogManager.shutdown()
+val f = new File(tmpLogDir, ".lock")
+assertFalse(f.exists())
+Utils.delete(tmpLogDir)

Review Comment:
   We should also wrap the logmanager.shutdown and tmpLogDir deletion in the 
`finally` block to make sure resources are released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]

2024-03-20 Thread via GitHub


FrankYang0529 commented on code in PR #15568:
URL: https://github.com/apache/kafka/pull/15568#discussion_r1533196551


##
core/src/main/scala/kafka/utils/FileLock.scala:
##
@@ -65,8 +65,14 @@ class FileLock(val file: File) extends Logging {
   def unlock(): Unit = {
 this synchronized {
   trace(s"Releasing lock on ${file.getAbsolutePath}")
-  if (flock != null)
+  if (flock != null) {
 flock.release()
+if (file.delete()) {
+  trace(s"Deleted ${file.getAbsolutePath}")
+} else {
+  warn(s"Could not delete ${file.getAbsolutePath}")
+}

Review Comment:
   Agree! Deleting the file in `destroy` makes sense to me. I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Tuple2 replaced with Map.Entry [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #15560:
URL: https://github.com/apache/kafka/pull/15560#issuecomment-201902

   @nizhikov could you please rebase code to trigger QA again? one of build is 
shutdown :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16397) Use ByteBufferOutputStream to avoid array copy

2024-03-20 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829359#comment-17829359
 ] 

Chia-Ping Tsai commented on KAFKA-16397:


[~apoorvmittal10] FYI, and please let me know if you have no free cycle :)

> Use ByteBufferOutputStream to avoid array copy
> --
>
> Key: KAFKA-16397
> URL: https://issues.apache.org/jira/browse/KAFKA-16397
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> from https://github.com/apache/kafka/pull/15148#discussion_r1531889679
> source code:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java#L216
> we can use ByteBufferOutputStream to collect the uncompressed data, and then 
> return the inner buffer directly instead of copying full array.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition

2024-03-20 Thread ZhenChun Pan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhenChun Pan updated KAFKA-16396:
-
Affects Version/s: 3.5.0
   (was: 3.8.0)

> Producer use the same Key(string) result in multiple paritition
> ---
>
> Key: KAFKA-16396
> URL: https://issues.apache.org/jira/browse/KAFKA-16396
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.0
>Reporter: ZhenChun Pan
>Priority: Minor
>
> In orede to make  record  in order,we use the same 
> key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
> situation(maybe the Kafka service has been restarted during producing), we 
> find the same key records sent to  partition 0 and 4. 
> offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474
> offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16397) Use ByteBufferOutputStream to avoid array copy

2024-03-20 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16397:
--

 Summary: Use ByteBufferOutputStream to avoid array copy
 Key: KAFKA-16397
 URL: https://issues.apache.org/jira/browse/KAFKA-16397
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


from https://github.com/apache/kafka/pull/15148#discussion_r1531889679

source code:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java#L216

we can use ByteBufferOutputStream to collect the uncompressed data, and then 
return the inner buffer directly instead of copying full array.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition

2024-03-20 Thread ZhenChun Pan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhenChun Pan updated KAFKA-16396:
-
Description: 
In orede to make  record  in order,we use the same 
key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
situation(maybe the Kafka service has been restarted during producing), we find 
the same key records sent to  partition 0 and 4. 

offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474

offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370

 

 

 

  was:
In orede to make  record  in order,we use the same 
key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
situation, we find the same key records sent to  partition 0 and 4. 

offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474

offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370

 

 

 


> Producer use the same Key(string) result in multiple paritition
> ---
>
> Key: KAFKA-16396
> URL: https://issues.apache.org/jira/browse/KAFKA-16396
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.8.0
>Reporter: ZhenChun Pan
>Priority: Minor
>
> In orede to make  record  in order,we use the same 
> key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
> situation(maybe the Kafka service has been restarted during producing), we 
> find the same key records sent to  partition 0 and 4. 
> offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474
> offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]

2024-03-20 Thread via GitHub


showuon commented on code in PR #15568:
URL: https://github.com/apache/kafka/pull/15568#discussion_r1533191601


##
core/src/main/scala/kafka/utils/FileLock.scala:
##
@@ -65,8 +65,14 @@ class FileLock(val file: File) extends Logging {
   def unlock(): Unit = {
 this synchronized {
   trace(s"Releasing lock on ${file.getAbsolutePath}")
-  if (flock != null)
+  if (flock != null) {
 flock.release()
+if (file.delete()) {
+  trace(s"Deleted ${file.getAbsolutePath}")
+} else {
+  warn(s"Could not delete ${file.getAbsolutePath}")
+}

Review Comment:
   IMO, it is confusing if we do file deletion in `unlock` method. Suppose we 
need unlock and then lock, it'll fail now, right? Maybe delete in `destroy`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #15552:
URL: https://github.com/apache/kafka/pull/15552#issuecomment-2011101530

   @chiacyu thanks for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition

2024-03-20 Thread ZhenChun Pan (Jira)
ZhenChun Pan created KAFKA-16396:


 Summary: Producer use the same Key(string) result in multiple 
paritition
 Key: KAFKA-16396
 URL: https://issues.apache.org/jira/browse/KAFKA-16396
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.8.0
Reporter: ZhenChun Pan


In orede to make  record  in orde,we use the same 
key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
situation, we find the same key records sent to  partition 0 and 4. 

offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474

offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition

2024-03-20 Thread ZhenChun Pan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhenChun Pan updated KAFKA-16396:
-
Description: 
In orede to make  record  in order,we use the same 
key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
situation, we find the same key records sent to  partition 0 and 4. 

offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474

offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370

 

 

 

  was:
In orede to make  record  in orde,we use the same 
key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
situation, we find the same key records sent to  partition 0 and 4. 

offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474

offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370

 

 

 


> Producer use the same Key(string) result in multiple paritition
> ---
>
> Key: KAFKA-16396
> URL: https://issues.apache.org/jira/browse/KAFKA-16396
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.8.0
>Reporter: ZhenChun Pan
>Priority: Minor
>
> In orede to make  record  in order,we use the same 
> key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some 
> situation, we find the same key records sent to  partition 0 and 4. 
> offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474
> offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, 
> partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]

2024-03-20 Thread via GitHub


chia7712 merged PR #15552:
URL: https://github.com/apache/kafka/pull/15552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]

2024-03-20 Thread via GitHub


showuon commented on PR #15561:
URL: https://github.com/apache/kafka/pull/15561#issuecomment-2011075907

   @gaoran10, call for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]

2024-03-20 Thread via GitHub


showuon commented on code in PR #15561:
URL: https://github.com/apache/kafka/pull/15561#discussion_r1533168785


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -152,17 +160,19 @@ public void run() {
 consumer.seekToEnd(emptyList());
 consumer.commitSync();

Review Comment:
   Should we reset the `retries` to 0 here?



##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -145,6 +145,7 @@ public KafkaConsumer createKafkaConsumer() 
{
 }
 // sets the reset offset policy in case of invalid or no offset
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);

Review Comment:
   Why should we add this?



##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) {
 }).sum();
 }
 
+private int retry(int retries, KafkaConsumer consumer, 
ConsumerRecords records) {

Review Comment:
   Since we will not always retry. Maybe rename to `maybeRetry` ? 



##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) {
 }).sum();
 }
 
+private int retry(int retries, KafkaConsumer consumer, 
ConsumerRecords records) {
+retries++;
+if (retries > 0 && retries <= MAX_RETRIES) {
+// retry: reset fetch offset
+// the consumer fetch position needs to be restored to the 
committed offset before the transaction started
+Map committed = 
consumer.committed(consumer.assignment());
+consumer.assignment().forEach(tp -> {
+OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+if (offsetAndMetadata != null) {
+consumer.seek(tp, offsetAndMetadata.offset());
+} else {
+consumer.seekToBeginning(Collections.singleton(tp));
+}
+});
+} else if (retries > MAX_RETRIES) {
+// continue: skip bad records
+// in addition to logging, you may want to send these records to a 
DLQ for further processing
+records.forEach(record -> {
+Utils.printErr("Skipping record after %d retries: %s", 
MAX_RETRIES, record.value());
+consumer.seek(new TopicPartition(record.topic(), 
record.partition()), record.offset() + 1);
+consumer.commitSync();
+});
+retries = 0;
+}

Review Comment:
   I think we should also have a `else` case to print some error messages for 
retries < 0's case.



##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) {
 }).sum();
 }
 
+private int retry(int retries, KafkaConsumer consumer, 
ConsumerRecords records) {
+retries++;
+if (retries > 0 && retries <= MAX_RETRIES) {
+// retry: reset fetch offset
+// the consumer fetch position needs to be restored to the 
committed offset before the transaction started
+Map committed = 
consumer.committed(consumer.assignment());
+consumer.assignment().forEach(tp -> {
+OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+if (offsetAndMetadata != null) {
+consumer.seek(tp, offsetAndMetadata.offset());
+} else {
+consumer.seekToBeginning(Collections.singleton(tp));
+}
+});
+} else if (retries > MAX_RETRIES) {
+// continue: skip bad records
+// in addition to logging, you may want to send these records to a 
DLQ for further processing
+records.forEach(record -> {
+Utils.printErr("Skipping record after %d retries: %s", 
MAX_RETRIES, record.value());
+consumer.seek(new TopicPartition(record.topic(), 
record.partition()), record.offset() + 1);
+consumer.commitSync();

Review Comment:
   Is this necessary? I thought we'll move to next offset even if we didn't do 
this seek. No?



##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) {
 }).sum();
 }
 
+private int retry(int retries, KafkaConsumer consumer, 
ConsumerRecords records) {

Review Comment:
   Also, since this is an example for "general users", it'd be better we add 
some comments for this method above. Something like what this method is doing, 
why do we need this method, and what's the result exceeding max retry... etc. 



-- 
This is an automated 

[PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]

2024-03-20 Thread via GitHub


FrankYang0529 opened a new pull request, #15568:
URL: https://github.com/apache/kafka/pull/15568

   Currently, server adds a `.lock` file to each log folder. The file is 
useless after server is down.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-20 Thread via GitHub


showuon commented on PR #15505:
URL: https://github.com/apache/kafka/pull/15505#issuecomment-2011051818

   Retriggering the CI  build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1533141970


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -40,15 +48,29 @@ public KafkaMetric(Object lock, MetricName metricName, 
MetricValueProvider va
 this.time = time;
 }
 
+/**
+ * Get the configuration of this metric.
+ * This is supposed to be used by server only.

Review Comment:
   @mimaison it seems to me those methods are used by server to "write" 
something (for example, quota), and so users should view the metrics as 
readonly object. Please correct me If I misunderstand the purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-03-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16390:
--
Summary: consume_bench_test.py failed using AsyncKafkaConsumer  (was: 
consumer_bench_test.py failed using AsyncKafkaConsumer)

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16275: Update transactions_test.py to support KIP-848’s group protocol config [kafka]

2024-03-20 Thread via GitHub


kirktrue opened a new pull request, #15567:
URL: https://github.com/apache/kafka/pull/15567

   Added a new optional `group_protocol` parameter to the test methods, then 
passed that down to the methods involved.
   
   Unfortunately, because the new consumer can only be used with the new 
coordinator, this required a new `@matrix` block instead of adding the 
`group_protocol=["classic", "consumer"]` to the existing blocks 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]

2024-03-20 Thread via GitHub


pasharik commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2010846230

   @gaurav-narula I've tried zinc `1.9.6` with gradle `8.5` and `8.6`, but 
getting the same issue with all tests re-compiled.
   
   Probably one radical solution to this incremental compilation issue with 
Scala, can be re-writing the entire `AclCommandTest` in Java? :smile: I'm 
looking to add KRaft support for this test, so there will be lot of changes in 
it anyway, as I understand.  I see there are already some Java tests in `core` 
module


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-03-20 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1532971164


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+}
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);
+}
+}
+
+public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+ByteBuffer data = ByteBuffer.wrap(metrics);
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+byte[] bytes = new byte[data.capacity() * 2];
+int nRead;
+while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+out.write(bytes, 0, nRead);
+}
+
+out.flush();
+return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   @chia7712 Sounds good, can you please create an improvement jira for myself, 
I ll address that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao resolved KAFKA-16395.
---
Resolution: Not A Bug

> Producer should refresh metadata on a socket request timeout
> 
>
> Key: KAFKA-16395
> URL: https://issues.apache.org/jira/browse/KAFKA-16395
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Critical
>
> I noticed in a set of producer logs that on a broker outage, we saw the 
> following sequence of logs:
> Got error produce response with correlation id 1661616 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
> Message: Disconnected from node 0 due to timeout
> Got error produce response with correlation id 1662093 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER
> Received invalid metadata error in produce request on partition topic-0 due 
> to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
> intended only for the leader, this error indicates that the broker is not the 
> current leader. For requests intended for any replica, this error indicates 
> that the broker is not a replica of the topic partition.. Going to request 
> metadata update now
> this implies we did not request metadata between our produce request 
> attempts. This is a regression introduced by 
> https://issues.apache.org/jira/browse/KAFKA-14317.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829311#comment-17829311
 ] 

David Mao commented on KAFKA-16395:
---

on a closer reading, I believe I misunderstood the code in KAFKA-14317 and this 
is probably not a bug. closing the JIRA.

> Producer should refresh metadata on a socket request timeout
> 
>
> Key: KAFKA-16395
> URL: https://issues.apache.org/jira/browse/KAFKA-16395
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Critical
>
> I noticed in a set of producer logs that on a broker outage, we saw the 
> following sequence of logs:
> Got error produce response with correlation id 1661616 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
> Message: Disconnected from node 0 due to timeout
> Got error produce response with correlation id 1662093 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER
> Received invalid metadata error in produce request on partition topic-0 due 
> to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
> intended only for the leader, this error indicates that the broker is not the 
> current leader. For requests intended for any replica, this error indicates 
> that the broker is not a replica of the topic partition.. Going to request 
> metadata update now
> this implies we did not request metadata between our produce request 
> attempts. This is a regression introduced by 
> https://issues.apache.org/jira/browse/KAFKA-14317.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]

2024-03-20 Thread via GitHub


splett2 closed pull request #15565: KAFKA-16395: Producer should refresh 
metadata when a request is cancelled due to request timeout
URL: https://github.com/apache/kafka/pull/15565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-03-20 Thread via GitHub


apoorvmittal10 commented on PR #15532:
URL: https://github.com/apache/kafka/pull/15532#issuecomment-2010700271

   @ijuma Build passes with unrelated test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532907721


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   Do we have a test to confirm this behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532903183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   > Hum... My understanding is that the code will actually set 
lastHighWatermark from NO_OFFSET to h1 and push the event in this case.
   
   Thanks for the correction  You're right, I misunderstood the process. Makes 
sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]

2024-03-20 Thread via GitHub


splett2 commented on PR #15565:
URL: https://github.com/apache/kafka/pull/15565#issuecomment-2010669973

   I took a closer look and I am pretty sure that my repro is incorrect. I 
think the network client was behaving correctly based on my reading of the code 
from KAFKA-14317, but perhaps it wasn't logging out when metadata refreshes 
were being requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KRaft upgrade tests should only use latest stable mv [kafka]

2024-03-20 Thread via GitHub


ahuang98 opened a new pull request, #15566:
URL: https://github.com/apache/kafka/pull/15566

   This should help us avoid testing MVs before the corresponding release 
exists.
   We revert back from testing 3.8 in this case since 3.7 is the current stable 
version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-03-20 Thread via GitHub


ahuang98 commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-2010532534

   @mimaison could you help merge this if this looks good to you? :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16276) Update transactions_test.py to support KIP-848’s group protocol config

2024-03-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16276:
-

Assignee: Kirk True

> Update transactions_test.py to support KIP-848’s group protocol config
> --
>
> Key: KAFKA-16276
> URL: https://issues.apache.org/jira/browse/KAFKA-16276
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{transactions_test.py}} to 
> support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.
> The wrinkle here is that {{transactions_test.py}}  was not able to run as-is. 
> That might deprioritize this until whatever is causing that is resolved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532741513


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.
+processor.enqueueFirst(new 
CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {
+long newHighWatermark = 
lastHighWatermark.getAndSet(NO_OFFSET);
+
+CoordinatorContext context = coordinators.get(tp);

Review Comment:
   In order to have better logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532739687


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   > The first HWM advancement to h1 will set lastHighWatermark to NO_OFFSET 
and enqueueFirst() HWM update event.
   
   Hum... My understanding is that the code will actually set lastHighWatermark 
from NO_OFFSET to h1 and push the event in this case.
   
   > Before the first event runs, let's say the HWM advances to h2. this will 
see that lastHighWatermark is NO_OFFSET and will skip enqueueFirst().
   
   It will update lastHighWatermark to h2 and, as the previous value is not 
NO_OFFSET, it does not push the event this time.
   
   > I wonder if we can:
   > * keep track of highest HWM updated
   > * only enqueueFirst if the offset to update is greater than highest HWM 
recorded
   
   Isn't it more or less what my change does? It does not enforce that the HWM 
is greater than the previous one though but this should not happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16353: Offline protocol migration integration tests [kafka]

2024-03-20 Thread via GitHub


dajac commented on code in PR #15492:
URL: https://github.com/apache/kafka/pull/15492#discussion_r1532711510


##
core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala:
##
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.Group
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),

Review Comment:
   nit: It may be better to use `new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer")` now.



##
core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala:
##
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.Group
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOfflineUpgrade(): Unit = {
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+// Create a classic group by joining a member.
+val groupId = "grp"
+val (memberId, _) = joinDynamicConsumerGroupWithOldProtocol(groupId)
+

Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]

2024-03-20 Thread via GitHub


ijuma commented on code in PR #15565:
URL: https://github.com/apache/kafka/pull/15565#discussion_r1532671434


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -586,7 +586,7 @@ private void handleProduceResponse(ClientResponse response, 
Map

[PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]

2024-03-20 Thread via GitHub


splett2 opened a new pull request, #15565:
URL: https://github.com/apache/kafka/pull/15565

   ### What
   On a client-side triggered request timeout (the client did not receive a 
response from the server in a timely manner), the client needs to refresh 
metadata. This requires the partition response passed to `completeBatch` to be 
an `instanceof InvalidMetadataException`.
   This is slightly different than Errors.REQUEST_TIMED_OUT, since the latter 
is returned when the HWM fails to advance within the configured request timeout 
threshold, so we do not want to make `REQUEST_TIMED_OUT` an invalid metadata 
exception.
   
   I couldn't find a better fit than NETWORK_EXCEPTION, so hopefully the 
improved logging from KAFKA-14317 still reasonably distinguishes between 
request timeouts and sudden connection interrupts.
   
   ### Testing
   Added a unit test in `SenderTest` using 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)
David Mao created KAFKA-16395:
-

 Summary: Producer should refresh metadata on a socket request 
timeout
 Key: KAFKA-16395
 URL: https://issues.apache.org/jira/browse/KAFKA-16395
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao


I noticed in a set of producer logs that on a broker outage, we saw the 
following sequence of logs:

Got error produce response with correlation id 1661616 on topic-partition 
topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
Message: Disconnected from node 0 due to timeout

Got error produce response with correlation id 1662093 on topic-partition 
topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER

Received invalid metadata error in produce request on partition topic-0 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now

this implies we did not request metadata between our produce request attempts. 
This is a regression introduced by 
https://issues.apache.org/jira/browse/KAFKA-14317.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16389:
-

Assignee: Kirk True  (was: Lianet Magrans)

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15899) Move kafka.security package from core to server module

2024-03-20 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829267#comment-17829267
 ] 

Nikolay Izhikov commented on KAFKA-15899:
-

Hello [~omnia_h_ibrahim]

Can I assign this ticket to myself?

> Move kafka.security package from core to server module
> --
>
> Key: KAFKA-15899
> URL: https://issues.apache.org/jira/browse/KAFKA-15899
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Omnia Ibrahim
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16352) Transaction may get get stuck in PrepareCommit or PrepareAbort state

2024-03-20 Thread Artem Livshits (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artem Livshits resolved KAFKA-16352.

Fix Version/s: 3.8.0
 Reviewer: Justine Olshan
   Resolution: Fixed

> Transaction may get get stuck in PrepareCommit or PrepareAbort state
> 
>
> Key: KAFKA-16352
> URL: https://issues.apache.org/jira/browse/KAFKA-16352
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Artem Livshits
>Assignee: Artem Livshits
>Priority: Major
> Fix For: 3.8.0
>
>
> A transaction took a long time to complete, trying to restart a producer 
> would lead to CONCURRENT_TRANSACTION errors.  Investigation has shown that 
> the transaction was stuck in PrepareCommit for a few days:
> (current time when the investigation happened: Feb 27 2024), transaction 
> state:
> {{Type   |Name                  |Value}}
> {{-}}
> {{ref    |transactionalId       |xxx-yyy}}
> {{long   |producerId            |299364}}
> {{ref    |state                 |kafka.coordinator.transaction.PrepareCommit$ 
> @ 0x44fe22760}}
> {{long   |txnStartTimestamp     |1708619624810  Thu Feb 22 2024 16:33:44.810 
> GMT+}}
> {{long   |txnLastUpdateTimestamp|1708619625335  Thu Feb 22 2024 16:33:45.335 
> GMT+}}
> {{-}}
> The partition list was empty and transactionsWithPendingMarkers didn't 
> contain the reference to the transactional state.  In the log there were the 
> following relevant messages:
> {{22 Feb 2024 @ 16:33:45.623 UTC [Transaction State Manager 1]: Completed 
> loading transaction metadata from __transaction_state-3 for coordinator epoch 
> 611}}
> (this is the partition that contains the transactional id).  After the data 
> is loaded, it sends out markers and etc.
> Then there is this message:
> {{22 Feb 2024 @ 16:33:45.696 UTC [Transaction Marker Request Completion 
> Handler 4]: Transaction coordinator epoch for xxx-yyy has changed from 610 to 
> 611; cancel sending transaction markers TxnMarkerEntry{producerId=299364, 
> producerEpoch=1005, coordinatorEpoch=610, result=COMMIT, 
> partitions=[foo-bar]} to the brokers}}
> this message is logged just before the state is removed 
> transactionsWithPendingMarkers, but the state apparently contained the entry 
> that was created by the load operation.  So the sequence of events probably 
> looked like the following:
>  # partition load completed
>  # commit markers were sent for transactional id xxx-yyy; entry in 
> transactionsWithPendingMarkers was created
>  # zombie reply from the previous epoch completed, removed entry from 
> transactionsWithPendingMarkers
>  # commit markers properly completed, but couldn't transition to 
> CommitComplete state because transactionsWithPendingMarkers didn't have the 
> proper entry, so it got stuck there until the broker was restarted
> Looking at the code there are a few cases that could lead to similar race 
> conditions.  The fix is to keep track of the PendingCompleteTxn value that 
> was used when sending the marker, so that we can only remove the state that 
> was created when the marker was sent and not accidentally remove the state 
> someone else created.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-20 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16394:
---

 Summary: ForeignKey LEFT join propagates null value on foreignKey 
change
 Key: KAFKA-16394
 URL: https://issues.apache.org/jira/browse/KAFKA-16394
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
 Attachments: ForeignJoinTest.scala, JsonSerde.scala

We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns 
the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]

2024-03-20 Thread via GitHub


AyoubOm opened a new pull request, #15564:
URL: https://github.com/apache/kafka/pull/15564

   Added unit tests of two processors included in foreignKey join : 
`SubscriptionSendProcessorSupplier` and `ForeignTableJoinProcessorSupplier`.
   Renamed ForeignTableJoinProcessorSupplierTest to 
SubscriptionJoinProcessorSupplierTest as that's the processor which the test 
class is testing. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1532500641


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextHeartbeatRequest() // poll for next request as way to synchronize with 
the new value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// Depending on scheduling, the next request could either be 
BrokerRegistration or BrokerHeartbeat.

Review Comment:
   Before calling `manager.handleKraftJBODMetadataVersionUpdate()`, there 
should be 1 `CommunicationEvent` with a delay of 100 in `eventQueue`. Until we 
call `poll`, this `CommunicationEvent` will remain in `eventQueue`. Calling 
`manager.handleKraftJBODMetadataVersionUpdate()` causes a 
`KraftJBODMetadataVersionUpdateEvent` with no delay to be added to 
`eventQueue`. The `KraftJBODMetadataVersionUpdateEvent` will then be processed 
and replace the `CommunicationEvent` with a delay of 0. The 
`CommunicationEvent` will then be processed, which causes a 
`BrokerRegistration` request to be sent. So, it seems that the next request 
should always be `BrokerRegistration` when we call `poll`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2024-03-20 Thread via GitHub


mimaison commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-2010007861

   I don't think (at least don't know how) we can modify the jobs in Jenkins 
directly. The jobs are automatically created via the Jenkinsfile in this 
repository. I see some other Apache projects have multiple Jenkinsjob files, it 
would be good to understand how it works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-20 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1532418821


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -630,8 +631,16 @@ Map describeTopicConfigs(Set 
topics)
 Set resources = topics.stream()
 .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
 .collect(Collectors.toSet());
-return 
sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream()
-.collect(Collectors.toMap(x -> x.getKey().name(), 
Entry::getValue));
+try {

Review Comment:
   This only improves this single call, there are a bunch of other admin calls 
runs by the Schedulers that should also be updated. Considering the number of 
calls, it might make sense to have a helper method to wrap the calls and avoid 
too much duplication.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -224,6 +229,39 @@ public void testNoBrokerAclAuthorizer() throws Exception {
 verifyNoInteractions(targetAdmin);
 }
 
+@Test
+public void testMissingDescribeConfigsAcl() throws Exception {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+Field configField = connector.getClass().getDeclaredField("config");

Review Comment:
   Instead of using reflection, I wonder if it would be better to adjust one of 
the existing constructors used for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-20 Thread via GitHub


mimaison commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1532394640


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -40,15 +48,29 @@ public KafkaMetric(Object lock, MetricName metricName, 
MetricValueProvider va
 this.time = time;
 }
 
+/**
+ * Get the configuration of this metric.
+ * This is supposed to be used by server only.

Review Comment:
   What do you mean by `This is supposed to be used by server only.`?
   Same for the other method below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16232: kafka hangs forever in the starting process if the authorizer future is not returned [kafka]

2024-03-20 Thread via GitHub


brandboat commented on PR #15549:
URL: https://github.com/apache/kafka/pull/15549#issuecomment-2009924617

   gentle ping @showuon , would you mind take a look ? Many thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16388: add production-ready test of 2.7, 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString [kafka]

2024-03-20 Thread via GitHub


brandboat opened a new pull request, #15563:
URL: https://github.com/apache/kafka/pull/15563

   Add 2.7, 3.3~3.6 release test to MetadataVersionTest.testFromVersionString
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


C0urante merged PR #15562:
URL: https://github.com/apache/kafka/pull/15562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


C0urante commented on PR #15562:
URL: https://github.com/apache/kafka/pull/15562#issuecomment-2009913326

   Tests pass locally and this change is trivial; going to merge...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


C0urante commented on code in PR #15562:
URL: https://github.com/apache/kafka/pull/15562#discussion_r1532337351


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java:
##
@@ -134,6 +134,16 @@ public void testProcessPartitionKeyValidList() {
 }
 }
 
+@Test
+public void testProcessPartitionKeyNullOffset() {

Review Comment:
   Good catch 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-03-20 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16073:
--
Fix Version/s: 3.6.3
   (was: 3.6.2)

> Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset 
> Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16393) SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly

2024-03-20 Thread Haruki Okada (Jira)
Haruki Okada created KAFKA-16393:


 Summary: SslTransportLayer doesn't implement write(ByteBuffer[], 
int, int) correctly
 Key: KAFKA-16393
 URL: https://issues.apache.org/jira/browse/KAFKA-16393
 Project: Kafka
  Issue Type: Improvement
Reporter: Haruki Okada


As of Kafka 3.7.0, SslTransportLayer.write(ByteBuffer[], int, int) is 
implemented like below:

{code:java}
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException 
{
...
int i = offset;
while (i < length) {
if (srcs[i].hasRemaining() || hasPendingWrites()) {

{code}

The loop index starts at `offset` and ends with `length`.
However this isn't correct because end-index should be `offset + length`.

Let's say we have the array of ByteBuffer with length = 5 and try calling this 
method with offset = 3, length = 1.

In current code, `write(srcs, 3, 1)` doesn't attempt any write because the loop 
condition is immediately false.

For now, seems this method is only called with args offset = 0, length = 
srcs.length in Kafka code base so not causing any problem though, we should fix 
this because this could introduce subtle bug if use this method with different 
args in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description

2024-03-20 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960
 ] 

Haruki Okada edited comment on KAFKA-16372 at 3/20/24 2:16 PM:
---

[~showuon] Agreed.
One concern is, IMO many developers expect this "exception thrown on buffer 
full after max.block.ms"-behavior (because it's stated in Javadoc while we 
rarely hit buffer-full situation so no one realized this discrepancy).

Even some famous open-sources have exception-handling code which doesn't work 
actually due to this. (e.g. 
[logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29])

I wonder if just fixing Javadoc and Kafka documentation is fine, or we should 
make a heads up about this somewhere (e.g. at Kafka user mailing list).

I would like to hear committer's opinion.

Anyways, meanwhile let me start fixing the docs.


was (Author: ocadaruma):
[~showuon] Agreed.
One concern is, IMO many developers expect this "exception thrown on buffer 
full after max.block.ms"-behavior (because it's stated in Javadoc while we 
rarely hit buffer-full situation so no one realized this discrepancy).

Even some famous open-sources have exception-handling code which doesn't work 
actually due to this. (e.g. 
[logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29])

I wonder if just fixing Javadoc and Kafka documentation is fine, or we should 
include a heads up about this somewhere (e.g. at Kafka user mailing list).

I would like to hear committer's opinion.

Anyways, meanwhile let me start fixing the docs.

> max.block.ms behavior inconsistency with javadoc and the config description
> ---
>
> Key: KAFKA-16372
> URL: https://issues.apache.org/jira/browse/KAFKA-16372
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Haruki Okada
>Priority: Minor
>
> As of Kafka 3.7.0, the javadoc of 
> [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956]
>  states that it throws TimeoutException when max.block.ms is exceeded on 
> buffer allocation or initial metadata fetch.
> Also it's stated in [buffer.memory config 
> description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory].
> However, I found that this is not true because TimeoutException extends 
> ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as 
> FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086]
>  instead of throwing it.
> I wonder if this is a bug or the documentation error.
> Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description

2024-03-20 Thread Haruki Okada (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haruki Okada reassigned KAFKA-16372:


Assignee: Haruki Okada

> max.block.ms behavior inconsistency with javadoc and the config description
> ---
>
> Key: KAFKA-16372
> URL: https://issues.apache.org/jira/browse/KAFKA-16372
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> As of Kafka 3.7.0, the javadoc of 
> [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956]
>  states that it throws TimeoutException when max.block.ms is exceeded on 
> buffer allocation or initial metadata fetch.
> Also it's stated in [buffer.memory config 
> description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory].
> However, I found that this is not true because TimeoutException extends 
> ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as 
> FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086]
>  instead of throwing it.
> I wonder if this is a bug or the documentation error.
> Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description

2024-03-20 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960
 ] 

Haruki Okada edited comment on KAFKA-16372 at 3/20/24 2:15 PM:
---

[~showuon] Agreed.
One concern is, IMO many developers expect this "exception thrown on buffer 
full after max.block.ms"-behavior (because it's stated in Javadoc while we 
rarely hit buffer-full situation so no one realized this discrepancy).

Even some famous open-sources have exception-handling code which doesn't work 
actually due to this. (e.g. 
[logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29])

I wonder if just fixing Javadoc and Kafka documentation is fine, or we should 
include a heads up about this somewhere (e.g. at Kafka user mailing list).

I would like to hear committer's opinion.

Anyways, meanwhile let me start fixing the docs.


was (Author: ocadaruma):
[~showuon] Agreed.
One concern is, IMO many developers expect this "exception thrown on buffer 
full after max.block.ms"-behavior (because it's stated in Javadoc while we 
rarely hit buffer-full situation so no one realized this discrepancy).

Even some famous open-sources have exception-handling code which doesn't work 
actually due to this. (e.g. 
[logback-kafka-append|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29])

I wonder if just fixing Javadoc and Kafka documentation is fine, or we should 
include a heads up about this somewhere (e.g. at Kafka user mailing list).

I would like to hear committer's opinion.

Anyways, meanwhile let me start fixing the docs.

> max.block.ms behavior inconsistency with javadoc and the config description
> ---
>
> Key: KAFKA-16372
> URL: https://issues.apache.org/jira/browse/KAFKA-16372
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Haruki Okada
>Priority: Minor
>
> As of Kafka 3.7.0, the javadoc of 
> [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956]
>  states that it throws TimeoutException when max.block.ms is exceeded on 
> buffer allocation or initial metadata fetch.
> Also it's stated in [buffer.memory config 
> description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory].
> However, I found that this is not true because TimeoutException extends 
> ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as 
> FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086]
>  instead of throwing it.
> I wonder if this is a bug or the documentation error.
> Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description

2024-03-20 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960
 ] 

Haruki Okada commented on KAFKA-16372:
--

[~showuon] Agreed.
One concern is, IMO many developers expect this "exception thrown on buffer 
full after max.block.ms"-behavior (because it's stated in Javadoc while we 
rarely hit buffer-full situation so no one realized this discrepancy).

Even some famous open-sources have exception-handling code which doesn't work 
actually due to this. (e.g. 
[logback-kafka-append|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29])

I wonder if just fixing Javadoc and Kafka documentation is fine, or we should 
include a heads up about this somewhere (e.g. at Kafka user mailing list).

I would like to hear committer's opinion.

Anyways, meanwhile let me start fixing the docs.

> max.block.ms behavior inconsistency with javadoc and the config description
> ---
>
> Key: KAFKA-16372
> URL: https://issues.apache.org/jira/browse/KAFKA-16372
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Haruki Okada
>Priority: Minor
>
> As of Kafka 3.7.0, the javadoc of 
> [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956]
>  states that it throws TimeoutException when max.block.ms is exceeded on 
> buffer allocation or initial metadata fetch.
> Also it's stated in [buffer.memory config 
> description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory].
> However, I found that this is not true because TimeoutException extends 
> ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as 
> FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086]
>  instead of throwing it.
> I wonder if this is a bug or the documentation error.
> Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] Splitting consumer tests [kafka]

2024-03-20 Thread via GitHub


lianetm closed pull request #15535: [WIP] Splitting consumer tests
URL: https://github.com/apache/kafka/pull/15535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]

2024-03-20 Thread via GitHub


chiacyu commented on code in PR #15552:
URL: https://github.com/apache/kafka/pull/15552#discussion_r1532121819


##
docs/ops.html:
##
@@ -3797,14 +3797,6 @@ Modifying certain dynamic configurations on the standalone KRaft 
controller
   
 
-  ZooKeeper to KRaft 
Migration

Review Comment:
   Sure, that makes sense. Thanks for the reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


yashmayya commented on code in PR #15562:
URL: https://github.com/apache/kafka/pull/15562#discussion_r1532113435


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java:
##
@@ -134,6 +134,16 @@ public void testProcessPartitionKeyValidList() {
 }
 }
 
+@Test
+public void testProcessPartitionKeyNullOffset() {

Review Comment:
   I think this should be `testProcessPartitionKeyNullPartition` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


C0urante commented on PR #15562:
URL: https://github.com/apache/kafka/pull/15562#issuecomment-2009601190

   @yashmayya @gharris1727 would either of you have a quick moment for this 
small patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]

2024-03-20 Thread via GitHub


C0urante opened a new pull request, #15562:
URL: https://github.com/apache/kafka/pull/15562

   [Jira](https://issues.apache.org/jira/browse/KAFKA-16392)
   
   This is a pretty lightweight change; we wrap the warning log message for 
unrecognized source partition types in a null guard.
   
   One test is added to verify the fix that ensures no log messages are emitted 
in the affected scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16392) Spurious log warnings: "Ignoring offset partition key with an unexpected format for the second element in the partition key list. Expected type: java.util.Map, actual ty

2024-03-20 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16392:
-

 Summary: Spurious log warnings: "Ignoring offset partition key 
with an unexpected format for the second element in the partition key list. 
Expected type: java.util.Map, actual type: null"
 Key: KAFKA-16392
 URL: https://issues.apache.org/jira/browse/KAFKA-16392
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.6.1, 3.7.0, 3.5.2, 3.5.1, 3.6.0, 3.5.0, 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Some source connectors choose not to specify source offsets with the records 
they emit (or rather, to provide null partitions/offsets). When these 
partitions are parsed by a Kafka Connect worker, this currently leads to a 
spurious warning log message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15951) MissingSourceTopicException should include topic names

2024-03-20 Thread sanghyeok An (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828944#comment-17828944
 ] 

sanghyeok An commented on KAFKA-15951:
--

Gently ping, [~mjsax].

When you have free time, could you check comments? :)

> MissingSourceTopicException should include topic names
> --
>
> Key: KAFKA-15951
> URL: https://issues.apache.org/jira/browse/KAFKA-15951
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> As the title say – we don't include topic names in all cases, what make it 
> hard for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Add retry mechanism to EOS example [kafka]

2024-03-20 Thread via GitHub


fvaleri opened a new pull request, #15561:
URL: https://github.com/apache/kafka/pull/15561

   In the initial EOS example, a retry logic was implemented within the 
`resetToLastCommittedPositions` method. During refactoring, this logic was 
removed becasue a poison pill prevented the example from reaching the final 
phase of consuming from the output topic.
   
   In this change, I suggest to add it back, but with a retry limit defined as 
`MAX_RETRIES`. Once this limit is reached, the problematic batch will be logged 
and skipped, allowing the processor to move on and process remaining records. 
If some records are skipped, the example will still hit the hard timeout (2 
minutes), but after consuming all processed records.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-20 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531939832


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

Re: [PR] KAFKA-16381: add lock for KafkaMetric config getter [kafka]

2024-03-20 Thread via GitHub


vamossagar12 commented on PR #15550:
URL: https://github.com/apache/kafka/pull/15550#issuecomment-2009204011

   > So you mean only if the config(MetricConfig config) works, then we need to 
return the updated config, right?
   
   yes, that's correct. Any other thread should see the updated state of the 
shared variables in the synchronised block only after the lock is released. And 
come to think of it, that is already being achieved without the synchronised 
block in the `config()` method.
   
   The changes you have added, provide the same guarantees as whatever exist 
today but at the expense of adding a lock. IF we want to make the value of the 
`config` object visible immediately to other threads, we could consider making 
it `volatile` but i am not sure if we really need it. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-20 Thread via GitHub


FrankYang0529 commented on code in PR #15505:
URL: https://github.com/apache/kafka/pull/15505#discussion_r1531943054


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -139,7 +139,7 @@ object StorageTool extends Logging {
   action(storeTrue())
 formatParser.addArgument("--release-version", "-r").
   action(store()).
-  help(s"A KRaft release version to use for the initial metadata version. 
The minimum is 3.0, the default is 
${MetadataVersion.LATEST_PRODUCTION.version()}")
+  help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION.version()}")

Review Comment:
   Yes, it looks like we only use `.version()` in few messages, so I remove it. 
Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #15483:
URL: https://github.com/apache/kafka/pull/15483#issuecomment-2009231142

   the failed tests are shown below, and they pass on my local
   ```script
   ./gradlew cleanTest :tools:test --tests 
ListConsumerGroupTest.testListConsumerGroupsWithTypesClassicProtocol --tests 
DescribeConsumerGroupTest.testDescribeStateWithMultiPartitionTopicAndMultipleConsumers
 :storage:test --tests 
TransactionsWithTieredStoreTest.testFencingOnAddPartitions --tests 
TransactionsWithTieredStoreTest.testCommitTransactionTimeout 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets
 --tests 
org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
 --tests 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testPotentialDeadlockWhenProducingToOffsetsTopic
 --tests 
org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
 --tests 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testTasksFailOnInabilityToFence
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOf
 fsetsDifferentKafkaClusterTargeted :metadata:test --tests 
QuorumControllerTest.testBootstrapZkMigrationRecord :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :server:test --tests 
ClientMetricsManagerTest.testCacheEviction :core:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeTopicAutoCreateTopicCreateAcl
 --tests 
AuthorizerIntegrationTest.shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL 
--tests 
AuthorizerIntegrationTest.testAuthorizeByResourceTypeMultipleAddAndRemove 
--tests AuthorizerIntegrationTest.testConsumeWithTopicWrite --tests 
ControllerRegistrationManagerTest.testWrongIncarnationId --tests 
ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
EagerConsumerCoordinatorTest.testOutdatedCoordinatorAssignment
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1531889679


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try {
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+}
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);
+}
+}
+
+public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+ByteBuffer data = ByteBuffer.wrap(metrics);
+try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+byte[] bytes = new byte[data.capacity() * 2];
+int nRead;
+while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+out.write(bytes, 0, nRead);
+}
+
+out.flush();
+return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   hi @apoorvmittal10 I have a question: Is it worth using 
`ByteBufferOutputStream` to replace `ByteArrayOutputStream`? We can avoid the 
array copy by taking buffer from `ByteBufferOutputStream` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]

2024-03-20 Thread via GitHub


fvaleri commented on PR #15561:
URL: https://github.com/apache/kafka/pull/15561#issuecomment-2009370898

   To make it easier to review, I'm adding the output of the new example 
behavior with 1 partition, 1 processor, max.poll.records = 1, and forcing 
random exceptions.
   
   ```sh
   $ examples/bin/exactly-once-demo.sh 1 1 10
   main - Deleted topics: [input-topic, output-topic]
   main - Waiting for topics metadata cleanup
   main - Created topics: [input-topic, output-topic]
   producer - Sample: record(0, test0), partition(input-topic-0), offset(0)
   producer - Sample: record(1, test1), partition(input-topic-0), offset(1)
   producer - Sample: record(2, test2), partition(input-topic-0), offset(2)
   producer - Sample: record(3, test3), partition(input-topic-0), offset(3)
   producer - Sample: record(4, test4), partition(input-topic-0), offset(4)
   producer - Sample: record(5, test5), partition(input-topic-0), offset(5)
   producer - Sample: record(6, test6), partition(input-topic-0), offset(6)
   producer - Sample: record(7, test7), partition(input-topic-0), offset(7)
   producer - Sample: record(8, test8), partition(input-topic-0), offset(8)
   producer - Sample: record(9, test9), partition(input-topic-0), offset(9)
   producer - Sent 10 records
   processor-0 - Processing new records
   processor-0 - Assigned partitions: [input-topic-0]
   >>> partition 0, offset 0
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 10
   >>> partition 0, offset 0
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 10
   >>> partition 0, offset 0
   processor-0 - Remaining records: 9
   >>> partition 0, offset 1
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 9
   >>> partition 0, offset 1
   processor-0 - Remaining records: 8
   >>> partition 0, offset 2
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 7
   >>> partition 0, offset 3
   processor-0 - Aborting transaction: Boom!
   processor-0 - Skipping record after 5 retries: test3-ok
   processor-0 - Remaining records: 6
   >>> partition 0, offset 4
   processor-0 - Remaining records: 5
   >>> partition 0, offset 5
   processor-0 - Remaining records: 4
   >>> partition 0, offset 6
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 3
   >>> partition 0, offset 7
   processor-0 - Aborting transaction: Boom!
   processor-0 - Skipping record after 5 retries: test7-ok
   processor-0 - Remaining records: 2
   >>> partition 0, offset 8
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 2
   >>> partition 0, offset 8
   processor-0 - Aborting transaction: Boom!
   processor-0 - Remaining records: 2
   >>> partition 0, offset 8
   processor-0 - Remaining records: 1
   >>> partition 0, offset 9
   processor-0 - Remaining records: 0
   processor-0 - Revoked partitions: [input-topic-0]
   processor-0 - Processed 8 records
   consumer - Subscribed to output-topic
   consumer - Assigned partitions: [output-topic-0]
   consumer - Sample: record(0, test0-ok), partition(output-topic-0), offset(3)
   consumer - Sample: record(1, test1-ok), partition(output-topic-0), offset(7)
   consumer - Sample: record(2, test2-ok), partition(output-topic-0), offset(9)
   consumer - Sample: record(4, test4-ok), partition(output-topic-0), offset(23)
   consumer - Sample: record(5, test5-ok), partition(output-topic-0), offset(25)
   consumer - Sample: record(6, test6-ok), partition(output-topic-0), offset(27)
   consumer - Sample: record(8, test8-ok), partition(output-topic-0), offset(45)
   consumer - Sample: record(9, test9-ok), partition(output-topic-0), offset(47)
   main - Timeout after 2 minutes waiting for output read
   consumer - Revoked partitions: [output-topic-0]
   consumer - Fetched 8 records
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, 

[jira] [Commented] (KAFKA-15736) KRaft support in PlaintextConsumerTest

2024-03-20 Thread Rory (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828892#comment-17828892
 ] 

Rory commented on KAFKA-15736:
--

Hi [~sameert], I think KRaft support is in place for this class, if I am not 
mistaken?

Can this ticket be closed?

Thanks

> KRaft support in PlaintextConsumerTest
> --
>
> Key: KAFKA-15736
> URL: https://issues.apache.org/jira/browse/KAFKA-15736
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in PlaintextConsumerTest in 
> core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala need to 
> be updated to support KRaft
> 49 : def testHeaders(): Unit = {
> 136 : def testDeprecatedPollBlocksForAssignment(): Unit = {
> 144 : def testHeadersSerializerDeserializer(): Unit = {
> 153 : def testMaxPollRecords(): Unit = {
> 169 : def testMaxPollIntervalMs(): Unit = {
> 194 : def testMaxPollIntervalMsDelayInRevocation(): Unit = {
> 234 : def testMaxPollIntervalMsDelayInAssignment(): Unit = {
> 258 : def testAutoCommitOnClose(): Unit = {
> 281 : def testAutoCommitOnCloseAfterWakeup(): Unit = {
> 308 : def testAutoOffsetReset(): Unit = {
> 319 : def testGroupConsumption(): Unit = {
> 339 : def testPatternSubscription(): Unit = {
> 396 : def testSubsequentPatternSubscription(): Unit = {
> 447 : def testPatternUnsubscription(): Unit = {
> 473 : def testCommitMetadata(): Unit = {
> 494 : def testAsyncCommit(): Unit = {
> 513 : def testExpandingTopicSubscriptions(): Unit = {
> 527 : def testShrinkingTopicSubscriptions(): Unit = {
> 541 : def testPartitionsFor(): Unit = {
> 551 : def testPartitionsForAutoCreate(): Unit = {
> 560 : def testPartitionsForInvalidTopic(): Unit = {
> 566 : def testSeek(): Unit = {
> 621 : def testPositionAndCommit(): Unit = {
> 653 : def testPartitionPauseAndResume(): Unit = {
> 671 : def testFetchInvalidOffset(): Unit = {
> 696 : def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
> 717 : def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
> 743 : def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
> 772 : def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = {
> 804 : def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit 
> = {
> 811 : def testFetchRecordLargerThanMaxPartitionFetchBytes(): Unit = {
> 819 : def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
> 867 : def testRoundRobinAssignment(): Unit = {
> 903 : def testMultiConsumerRoundRobinAssignor(): Unit = {
> 940 : def testMultiConsumerStickyAssignor(): Unit = {
> 986 : def testMultiConsumerDefaultAssignor(): Unit = {
> 1024 : def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
> 1109 : def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
> 1141 : def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
> 1146 : def testMultiConsumerSessionTimeoutOnClose(): Unit = {
> 1151 : def testInterceptors(): Unit = {
> 1210 : def testAutoCommitIntercept(): Unit = {
> 1260 : def testInterceptorsWithWrongKeyValue(): Unit = {
> 1286 : def testConsumeMessagesWithCreateTime(): Unit = {
> 1303 : def testConsumeMessagesWithLogAppendTime(): Unit = {
> 1331 : def testListTopics(): Unit = {
> 1351 : def testUnsubscribeTopic(): Unit = {
> 1367 : def testPauseStateNotPreservedByRebalance(): Unit = {
> 1388 : def testCommitSpecifiedOffsets(): Unit = {
> 1415 : def testAutoCommitOnRebalance(): Unit = {
> 1454 : def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = {
> 1493 : def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = {
> 1533 : def testPerPartitionLeadMetricsCleanUpWithAssign(): Unit = {
> 1562 : def testPerPartitionLagMetricsCleanUpWithAssign(): Unit = {
> 1593 : def testPerPartitionLagMetricsWhenReadCommitted(): Unit = {
> 1616 : def testPerPartitionLeadWithMaxPollRecords(): Unit = {
> 1638 : def testPerPartitionLagWithMaxPollRecords(): Unit = {
> 1661 : def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
> 1809 : def testConsumingWithNullGroupId(): Unit = {
> 1874 : def testConsumingWithEmptyGroupId(): Unit = {
> 1923 : def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = 
> {
> Scanned 1951 lines. Found 0 KRaft tests out of 61 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683586


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  // If the manager is idling until scheduled events we need to advance 
the clock
+  if (manager.eventQueue.scheduledAfterIdling()
+.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   That's a good idea. I'm making that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683014


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// We may have to accept some heartbeats before the new registration is 
sent
+while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   No, I don't think so. `prepareResponse`  delegates to `MockClient` which 
expects a predetermined request-response order. It supports a  `RequestMatcher` 
which `prepareResponse` uses to extract the request, but it does not support 
preparing a conditional response. We need a larger change to add support in 
`MockClient` for conditional prepared responses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Tuple2 replaced with Map.Entry [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #15560:
URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009033688

   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #15560:
URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009028276

   @nizhikov This is unrelated to KAFKA-14589, so please use "MINOR:" instead 
of "KAFKA-14589" :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #15560:
URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009012349

   This PR is follow-up for #14471 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2009012039

   @chia7712 Please, take a look - https://github.com/apache/kafka/pull/15560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]

2024-03-20 Thread via GitHub


nizhikov opened a new pull request, #15560:
URL: https://github.com/apache/kafka/pull/15560

   `Tuple2` replaced with SDK `Map.Entry` and `SimleImmutableEntry`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-20 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828654#comment-17828654
 ] 

Kuan Po Tseng commented on KAFKA-16385:
---

{quote}
 [~brandboat] , are you clear what you should do for this ticket?
Please let us know if you have any question.
{quote}

Thanks, I'm still poking around the source code, but sounds like we should 
document the behavior mentioned in this JIRA ticket.
If I have any questions, I'll consult with you all again. Huge thanks !

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15989) Upgrade existing generic group to consumer group

2024-03-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15989.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Upgrade existing generic group to consumer group
> 
>
> Key: KAFKA-15989
> URL: https://issues.apache.org/jira/browse/KAFKA-15989
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Assignee: David Jacot
>Priority: Minor
> Fix For: 3.8.0
>
>
> It should be possible to upgrade an existing generic group to a new consumer 
> group, in case it was using either the previous generic protocol or manual 
> partition assignment and commit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15763) Group Coordinator should not deliver new assignment before previous one is acknowledged

2024-03-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15763.
-
Resolution: Won't Fix

We went with another approach.

> Group Coordinator should not deliver new assignment before previous one is 
> acknowledged
> ---
>
> Key: KAFKA-15763
> URL: https://issues.apache.org/jira/browse/KAFKA-15763
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> In the initial implementation of the new consumer group protocol, the group 
> coordinators waits on received an acknowledgement from the consumer only when 
> there are partitions to be revoked. In the case of newly assigned partitions, 
> a new assignment can be delivered any time (e.g. in two subsequent 
> heartbeats).
> While implementing the state machine on the client side, we found out that 
> this caused confusion because the protocol does not treat revocation and 
> assignment in the same way. We also found out that changing the assignment 
> before the previous one is fully processed by the member makes the client 
> side logic more complicated than it should be because the consumer can't 
> process any new assignment until it has completed the previous one.
> In the end, it is better to change the server side to not deliver a new 
> assignment before the current one is acknowledged by the consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16313) Offline group protocol migration

2024-03-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16313.
-
Fix Version/s: 3.8.0
 Assignee: Dongnuo Lyu
   Resolution: Fixed

> Offline group protocol migration
> 
>
> Key: KAFKA-16313
> URL: https://issues.apache.org/jira/browse/KAFKA-16313
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-20 Thread via GitHub


dajac merged PR #15546:
URL: https://github.com/apache/kafka/pull/15546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008956850

   > The only issue I can see is naming.
   Map.Entry has getKey, getValue for first and second values in pair which 
sets some kind of relation between values for me.
   While Tuple2 just stores two values(v1, v2) that sits together in some piece 
of code.
   Should we go with getKey, getValue for tuple values? Is it clear naming? 
WDYT?
   
   My point was that we can leverage the JAVA API instead of creating new one :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008956417

   @chia7712 
   @jolshan 
   @mimaison 
   @tledkov 
   
   Guys, thank you for a help with this PR! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


chia7712 merged PR #14471:
URL: https://github.com/apache/kafka/pull/14471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008931321

   @chia7712 I tried to perform Tuple2 -> Map.Entry substitution.
   
   The only issue I can see is naming.
   `Map.Entry` has `getKey`, `getValue` for first and second values in pair 
which  sets some kind of relation between values for me.
   While Tuple2 just stores two values(v1, v2) that sits together in some piece 
of code.
   
   Should we go with `getKey`, `getValue` for tuple values? Is it clear naming? 
WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-03-20 Thread via GitHub


nizhikov commented on PR #15501:
URL: https://github.com/apache/kafka/pull/15501#issuecomment-2008920265

   @OmniaGM 
   
   > Start doing this but will be continue this after the Kafka Summit London.
   
   Great! Don't hesitate to ask for assistance if you need any help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-20 Thread via GitHub


showuon commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-2008904600

   Backedport to 3.7 and 3.6. @omkreddy , FYI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration

2024-03-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16222.
---
Resolution: Fixed

> KRaft Migration: Incorrect default user-principal quota after migration
> ---
>
> Key: KAFKA-16222
> URL: https://issues.apache.org/jira/browse/KAFKA-16222
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, migration
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Dominik
>Assignee: PoAn Yang
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We observed that our default user quota seems not to be migrated correctly.
> Before Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for the *default user-principal* are 
> consumer_byte_rate=100.0, producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
> After Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for *user-principal ''* are consumer_byte_rate=100.0, 
> producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
>  
> Additional finding: Our names contains a "@" which also lead to incorrect 
> after migration state.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-20 Thread via GitHub


chia7712 commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008792236

   @nizhikov thanks for all efforts! Please file PR to address follow-up, 
thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-20 Thread via GitHub


showuon merged PR #15481:
URL: https://github.com/apache/kafka/pull/15481


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-20 Thread via GitHub


showuon commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-2008777656

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828620#comment-17828620
 ] 

Luke Chen commented on KAFKA-16385:
---

Thanks for the response Jun, and thanks for the summary, Chia-Ping.

[~brandboat] , are you clear what you should do for this ticket?

Please let us know if you have any question.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Update upgrade docs to refer 3.6.2 version [kafka]

2024-03-20 Thread via GitHub


omkreddy closed pull request #15554: MINOR: Update upgrade docs to refer 3.6.2 
version
URL: https://github.com/apache/kafka/pull/15554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org