Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1533239354 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -182,14 +187,19 @@ public void testTopicPartitionsArg() { setUp(); List offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3"); -List expected = Arrays.asList( -new Row("__consumer_offsets", 3, 0L), +ArrayList expected = new ArrayList<>( +Arrays.asList( new Row("topic1", 0, 1L), new Row("topic2", 1, 2L), new Row("topic3", 2, 3L), new Row("topic4", 2, 4L) +) ); +if (!cluster.isKRaftTest()) { Review Comment: I feel this test needs to verify the match pattern `__.*:3`, so we can do read for KRaft mode to create the internal topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1533209761 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: Is it used for debugging? ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -338,6 +348,10 @@ private void assertExitCodeIsOne(String... args) { } private List expectedOffsetsWithInternal() { +if (cluster.isKRaftTest()) { Review Comment: I prefer to make callers use `expectedTestTopicOffsets` instead of adding if-else here. ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -182,14 +187,19 @@ public void testTopicPartitionsArg() { setUp(); List offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3"); -List expected = Arrays.asList( -new Row("__consumer_offsets", 3, 0L), +ArrayList expected = new ArrayList<>( Review Comment: It seems we can keep using `List`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use --no-daemon when building with Jenkins [kafka]
github-actions[bot] commented on PR #15057: URL: https://github.com/apache/kafka/pull/15057#issuecomment-2011157584 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]
showuon commented on code in PR #15568: URL: https://github.com/apache/kafka/pull/15568#discussion_r1533195347 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -1303,6 +1303,27 @@ class LogManagerTest { createLeaderAndIsrRequestForStrayDetection(present), onDisk.map(mockLog(_))).toSet) } + + @Test + def testLock(): Unit = { +val tmpLogDir = TestUtils.tempDir() +val tmpLogManager = createLogManager(Seq(tmpLogDir)) +tmpLogManager.startup(Set.empty) + +// ${tmpLogDir}.lock is acquired by tmpLogManager +val fileLock = new FileLock(new File(tmpLogDir, ".lock")) Review Comment: We should use `LogManager.LockFileName` instead. ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -1303,6 +1303,27 @@ class LogManagerTest { createLeaderAndIsrRequestForStrayDetection(present), onDisk.map(mockLog(_))).toSet) } + + @Test + def testLock(): Unit = { +val tmpLogDir = TestUtils.tempDir() +val tmpLogManager = createLogManager(Seq(tmpLogDir)) +tmpLogManager.startup(Set.empty) + +// ${tmpLogDir}.lock is acquired by tmpLogManager +val fileLock = new FileLock(new File(tmpLogDir, ".lock")) +try { + assertFalse(fileLock.tryLock()) +} finally { + fileLock.destroy() +} + +// ${tmpLogDir}.lock is removed after shutdown +tmpLogManager.shutdown() +val f = new File(tmpLogDir, ".lock") +assertFalse(f.exists()) +Utils.delete(tmpLogDir) Review Comment: We should also wrap the logmanager.shutdown and tmpLogDir deletion in the `finally` block to make sure resources are released. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]
FrankYang0529 commented on code in PR #15568: URL: https://github.com/apache/kafka/pull/15568#discussion_r1533196551 ## core/src/main/scala/kafka/utils/FileLock.scala: ## @@ -65,8 +65,14 @@ class FileLock(val file: File) extends Logging { def unlock(): Unit = { this synchronized { trace(s"Releasing lock on ${file.getAbsolutePath}") - if (flock != null) + if (flock != null) { flock.release() +if (file.delete()) { + trace(s"Deleted ${file.getAbsolutePath}") +} else { + warn(s"Could not delete ${file.getAbsolutePath}") +} Review Comment: Agree! Deleting the file in `destroy` makes sense to me. I will update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Tuple2 replaced with Map.Entry [kafka]
chia7712 commented on PR #15560: URL: https://github.com/apache/kafka/pull/15560#issuecomment-201902 @nizhikov could you please rebase code to trigger QA again? one of build is shutdown :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16397) Use ByteBufferOutputStream to avoid array copy
[ https://issues.apache.org/jira/browse/KAFKA-16397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829359#comment-17829359 ] Chia-Ping Tsai commented on KAFKA-16397: [~apoorvmittal10] FYI, and please let me know if you have no free cycle :) > Use ByteBufferOutputStream to avoid array copy > -- > > Key: KAFKA-16397 > URL: https://issues.apache.org/jira/browse/KAFKA-16397 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Minor > > from https://github.com/apache/kafka/pull/15148#discussion_r1531889679 > source code: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java#L216 > we can use ByteBufferOutputStream to collect the uncompressed data, and then > return the inner buffer directly instead of copying full array. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition
[ https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhenChun Pan updated KAFKA-16396: - Affects Version/s: 3.5.0 (was: 3.8.0) > Producer use the same Key(string) result in multiple paritition > --- > > Key: KAFKA-16396 > URL: https://issues.apache.org/jira/browse/KAFKA-16396 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.0 >Reporter: ZhenChun Pan >Priority: Minor > > In orede to make record in order,we use the same > key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some > situation(maybe the Kafka service has been restarted during producing), we > find the same key records sent to partition 0 and 4. > offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 > offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16397) Use ByteBufferOutputStream to avoid array copy
Chia-Ping Tsai created KAFKA-16397: -- Summary: Use ByteBufferOutputStream to avoid array copy Key: KAFKA-16397 URL: https://issues.apache.org/jira/browse/KAFKA-16397 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai from https://github.com/apache/kafka/pull/15148#discussion_r1531889679 source code: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java#L216 we can use ByteBufferOutputStream to collect the uncompressed data, and then return the inner buffer directly instead of copying full array. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition
[ https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhenChun Pan updated KAFKA-16396: - Description: In orede to make record in order,we use the same key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some situation(maybe the Kafka service has been restarted during producing), we find the same key records sent to partition 0 and 4. offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 was: In orede to make record in order,we use the same key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some situation, we find the same key records sent to partition 0 and 4. offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 > Producer use the same Key(string) result in multiple paritition > --- > > Key: KAFKA-16396 > URL: https://issues.apache.org/jira/browse/KAFKA-16396 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.8.0 >Reporter: ZhenChun Pan >Priority: Minor > > In orede to make record in order,we use the same > key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some > situation(maybe the Kafka service has been restarted during producing), we > find the same key records sent to partition 0 and 4. > offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 > offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]
showuon commented on code in PR #15568: URL: https://github.com/apache/kafka/pull/15568#discussion_r1533191601 ## core/src/main/scala/kafka/utils/FileLock.scala: ## @@ -65,8 +65,14 @@ class FileLock(val file: File) extends Logging { def unlock(): Unit = { this synchronized { trace(s"Releasing lock on ${file.getAbsolutePath}") - if (flock != null) + if (flock != null) { flock.release() +if (file.delete()) { + trace(s"Deleted ${file.getAbsolutePath}") +} else { + warn(s"Could not delete ${file.getAbsolutePath}") +} Review Comment: IMO, it is confusing if we do file deletion in `unlock` method. Suppose we need unlock and then lock, it'll fail now, right? Maybe delete in `destroy`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]
chia7712 commented on PR #15552: URL: https://github.com/apache/kafka/pull/15552#issuecomment-2011101530 @chiacyu thanks for your contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition
ZhenChun Pan created KAFKA-16396: Summary: Producer use the same Key(string) result in multiple paritition Key: KAFKA-16396 URL: https://issues.apache.org/jira/browse/KAFKA-16396 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.8.0 Reporter: ZhenChun Pan In orede to make record in orde,we use the same key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some situation, we find the same key records sent to partition 0 and 4. offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16396) Producer use the same Key(string) result in multiple paritition
[ https://issues.apache.org/jira/browse/KAFKA-16396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhenChun Pan updated KAFKA-16396: - Description: In orede to make record in order,we use the same key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some situation, we find the same key records sent to partition 0 and 4. offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 was: In orede to make record in orde,we use the same key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some situation, we find the same key records sent to partition 0 and 4. offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 > Producer use the same Key(string) result in multiple paritition > --- > > Key: KAFKA-16396 > URL: https://issues.apache.org/jira/browse/KAFKA-16396 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.8.0 >Reporter: ZhenChun Pan >Priority: Minor > > In orede to make record in order,we use the same > key:event#Collector-2021-01-01-001#Probe-0001#1067267613#1002。In some > situation, we find the same key records sent to partition 0 and 4. > offset: 7422, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 0, topic: IES.tunnel.event.1002, timestamp: 1710483530474 > offset: 7433, key: event#Collector-2021-01-01-001#Probe-0001#1067267613#1002, > partition: 4, topic: IES.tunnel.event.1002, timestamp: 1710483516370 > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]
chia7712 merged PR #15552: URL: https://github.com/apache/kafka/pull/15552 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]
showuon commented on PR #15561: URL: https://github.com/apache/kafka/pull/15561#issuecomment-2011075907 @gaoran10, call for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]
showuon commented on code in PR #15561: URL: https://github.com/apache/kafka/pull/15561#discussion_r1533168785 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -152,17 +160,19 @@ public void run() { consumer.seekToEnd(emptyList()); consumer.commitSync(); Review Comment: Should we reset the `retries` to 0 here? ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -145,6 +145,7 @@ public KafkaConsumer createKafkaConsumer() { } // sets the reset offset policy in case of invalid or no offset props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); Review Comment: Why should we add this? ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) { }).sum(); } +private int retry(int retries, KafkaConsumer consumer, ConsumerRecords records) { Review Comment: Since we will not always retry. Maybe rename to `maybeRetry` ? ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) { }).sum(); } +private int retry(int retries, KafkaConsumer consumer, ConsumerRecords records) { +retries++; +if (retries > 0 && retries <= MAX_RETRIES) { +// retry: reset fetch offset +// the consumer fetch position needs to be restored to the committed offset before the transaction started +Map committed = consumer.committed(consumer.assignment()); +consumer.assignment().forEach(tp -> { +OffsetAndMetadata offsetAndMetadata = committed.get(tp); +if (offsetAndMetadata != null) { +consumer.seek(tp, offsetAndMetadata.offset()); +} else { +consumer.seekToBeginning(Collections.singleton(tp)); +} +}); +} else if (retries > MAX_RETRIES) { +// continue: skip bad records +// in addition to logging, you may want to send these records to a DLQ for further processing +records.forEach(record -> { +Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); +consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); +consumer.commitSync(); +}); +retries = 0; +} Review Comment: I think we should also have a `else` case to print some error messages for retries < 0's case. ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) { }).sum(); } +private int retry(int retries, KafkaConsumer consumer, ConsumerRecords records) { +retries++; +if (retries > 0 && retries <= MAX_RETRIES) { +// retry: reset fetch offset +// the consumer fetch position needs to be restored to the committed offset before the transaction started +Map committed = consumer.committed(consumer.assignment()); +consumer.assignment().forEach(tp -> { +OffsetAndMetadata offsetAndMetadata = committed.get(tp); +if (offsetAndMetadata != null) { +consumer.seek(tp, offsetAndMetadata.offset()); +} else { +consumer.seekToBeginning(Collections.singleton(tp)); +} +}); +} else if (retries > MAX_RETRIES) { +// continue: skip bad records +// in addition to logging, you may want to send these records to a DLQ for further processing +records.forEach(record -> { +Utils.printErr("Skipping record after %d retries: %s", MAX_RETRIES, record.value()); +consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); +consumer.commitSync(); Review Comment: Is this necessary? I thought we'll move to next offset even if we didn't do this seek. No? ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -215,6 +225,33 @@ private long getRemainingRecords(KafkaConsumer consumer) { }).sum(); } +private int retry(int retries, KafkaConsumer consumer, ConsumerRecords records) { Review Comment: Also, since this is an example for "general users", it'd be better we add some comments for this method above. Something like what this method is doing, why do we need this method, and what's the result exceeding max retry... etc. -- This is an automated
[PR] KAFKA-16391: Cleanup .lock file after server is down [kafka]
FrankYang0529 opened a new pull request, #15568: URL: https://github.com/apache/kafka/pull/15568 Currently, server adds a `.lock` file to each log folder. The file is useless after server is down. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]
showuon commented on PR #15505: URL: https://github.com/apache/kafka/pull/15505#issuecomment-2011051818 Retriggering the CI build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/4/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
chia7712 commented on code in PR #15483: URL: https://github.com/apache/kafka/pull/15483#discussion_r1533141970 ## clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java: ## @@ -40,15 +48,29 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider va this.time = time; } +/** + * Get the configuration of this metric. + * This is supposed to be used by server only. Review Comment: @mimaison it seems to me those methods are used by server to "write" something (for example, quota), and so users should view the metrics as readonly object. Please correct me If I misunderstand the purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16390: -- Summary: consume_bench_test.py failed using AsyncKafkaConsumer (was: consumer_bench_test.py failed using AsyncKafkaConsumer) > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16275: Update transactions_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15567: URL: https://github.com/apache/kafka/pull/15567 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the methods involved. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]
pasharik commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2010846230 @gaurav-narula I've tried zinc `1.9.6` with gradle `8.5` and `8.6`, but getting the same issue with all tests re-compiled. Probably one radical solution to this incremental compilation issue with Scala, can be re-writing the entire `AclCommandTest` in Java? :smile: I'm looking to add KRaft support for this test, so there will be lot of changes in it anyway, as I understand. I see there are already some Java tests in `core` module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1532971164 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); +} +} + +public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { +ByteBuffer data = ByteBuffer.wrap(metrics); +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +ByteArrayOutputStream out = new ByteArrayOutputStream()) { + +byte[] bytes = new byte[data.capacity() * 2]; +int nRead; +while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +out.write(bytes, 0, nRead); +} + +out.flush(); +return ByteBuffer.wrap(out.toByteArray()); Review Comment: @chia7712 Sounds good, can you please create an improvement jira for myself, I ll address that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16395) Producer should refresh metadata on a socket request timeout
[ https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao resolved KAFKA-16395. --- Resolution: Not A Bug > Producer should refresh metadata on a socket request timeout > > > Key: KAFKA-16395 > URL: https://issues.apache.org/jira/browse/KAFKA-16395 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Critical > > I noticed in a set of producer logs that on a broker outage, we saw the > following sequence of logs: > Got error produce response with correlation id 1661616 on topic-partition > topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error > Message: Disconnected from node 0 due to timeout > Got error produce response with correlation id 1662093 on topic-partition > topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER > Received invalid metadata error in produce request on partition topic-0 due > to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests > intended only for the leader, this error indicates that the broker is not the > current leader. For requests intended for any replica, this error indicates > that the broker is not a replica of the topic partition.. Going to request > metadata update now > this implies we did not request metadata between our produce request > attempts. This is a regression introduced by > https://issues.apache.org/jira/browse/KAFKA-14317. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16395) Producer should refresh metadata on a socket request timeout
[ https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829311#comment-17829311 ] David Mao commented on KAFKA-16395: --- on a closer reading, I believe I misunderstood the code in KAFKA-14317 and this is probably not a bug. closing the JIRA. > Producer should refresh metadata on a socket request timeout > > > Key: KAFKA-16395 > URL: https://issues.apache.org/jira/browse/KAFKA-16395 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: David Mao >Priority: Critical > > I noticed in a set of producer logs that on a broker outage, we saw the > following sequence of logs: > Got error produce response with correlation id 1661616 on topic-partition > topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error > Message: Disconnected from node 0 due to timeout > Got error produce response with correlation id 1662093 on topic-partition > topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER > Received invalid metadata error in produce request on partition topic-0 due > to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests > intended only for the leader, this error indicates that the broker is not the > current leader. For requests intended for any replica, this error indicates > that the broker is not a replica of the topic partition.. Going to request > metadata update now > this implies we did not request metadata between our produce request > attempts. This is a regression introduced by > https://issues.apache.org/jira/browse/KAFKA-14317. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]
splett2 closed pull request #15565: KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout URL: https://github.com/apache/kafka/pull/15565 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]
apoorvmittal10 commented on PR #15532: URL: https://github.com/apache/kafka/pull/15532#issuecomment-2010700271 @ijuma Build passes with unrelated test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532907721 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: Do we have a test to confirm this behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532903183 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: > Hum... My understanding is that the code will actually set lastHighWatermark from NO_OFFSET to h1 and push the event in this case. Thanks for the correction You're right, I misunderstood the process. Makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]
splett2 commented on PR #15565: URL: https://github.com/apache/kafka/pull/15565#issuecomment-2010669973 I took a closer look and I am pretty sure that my repro is incorrect. I think the network client was behaving correctly based on my reading of the code from KAFKA-14317, but perhaps it wasn't logging out when metadata refreshes were being requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KRaft upgrade tests should only use latest stable mv [kafka]
ahuang98 opened a new pull request, #15566: URL: https://github.com/apache/kafka/pull/15566 This should help us avoid testing MVs before the corresponding release exists. We revert back from testing 3.8 in this case since 3.7 is the current stable version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
ahuang98 commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-2010532534 @mimaison could you help merge this if this looks good to you? :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16276) Update transactions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16276: - Assignee: Kirk True > Update transactions_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16276 > URL: https://issues.apache.org/jira/browse/KAFKA-16276 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{transactions_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The wrinkle here is that {{transactions_test.py}} was not able to run as-is. > That might deprioritize this until whatever is causing that is resolved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532741513 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. +processor.enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> { +long newHighWatermark = lastHighWatermark.getAndSet(NO_OFFSET); + +CoordinatorContext context = coordinators.get(tp); Review Comment: In order to have better logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532739687 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: > The first HWM advancement to h1 will set lastHighWatermark to NO_OFFSET and enqueueFirst() HWM update event. Hum... My understanding is that the code will actually set lastHighWatermark from NO_OFFSET to h1 and push the event in this case. > Before the first event runs, let's say the HWM advances to h2. this will see that lastHighWatermark is NO_OFFSET and will skip enqueueFirst(). It will update lastHighWatermark to h2 and, as the previous value is not NO_OFFSET, it does not push the event this time. > I wonder if we can: > * keep track of highest HWM updated > * only enqueueFirst if the offset to update is greater than highest HWM recorded Isn't it more or less what my change does? It does not enforce that the HWM is greater than the previous one though but this should not happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16353: Offline protocol migration integration tests [kafka]
dajac commented on code in PR #15492: URL: https://github.com/apache/kafka/pull/15492#discussion_r1532711510 ## core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala: ## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.Group +import org.apache.kafka.coordinator.group.classic.ClassicGroupState +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), Review Comment: nit: It may be better to use `new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer")` now. ## core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala: ## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.Group +import org.apache.kafka.coordinator.group.classic.ClassicGroupState +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOfflineUpgrade(): Unit = { +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +// Create a classic group by joining a member. +val groupId = "grp" +val (memberId, _) = joinDynamicConsumerGroupWithOldProtocol(groupId) +
Re: [PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]
ijuma commented on code in PR #15565: URL: https://github.com/apache/kafka/pull/15565#discussion_r1532671434 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -586,7 +586,7 @@ private void handleProduceResponse(ClientResponse response, Map
[PR] KAFKA-16395: Producer should refresh metadata when a request is cancelled due to request timeout [kafka]
splett2 opened a new pull request, #15565: URL: https://github.com/apache/kafka/pull/15565 ### What On a client-side triggered request timeout (the client did not receive a response from the server in a timely manner), the client needs to refresh metadata. This requires the partition response passed to `completeBatch` to be an `instanceof InvalidMetadataException`. This is slightly different than Errors.REQUEST_TIMED_OUT, since the latter is returned when the HWM fails to advance within the configured request timeout threshold, so we do not want to make `REQUEST_TIMED_OUT` an invalid metadata exception. I couldn't find a better fit than NETWORK_EXCEPTION, so hopefully the improved logging from KAFKA-14317 still reasonably distinguishes between request timeouts and sudden connection interrupts. ### Testing Added a unit test in `SenderTest` using ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16395) Producer should refresh metadata on a socket request timeout
David Mao created KAFKA-16395: - Summary: Producer should refresh metadata on a socket request timeout Key: KAFKA-16395 URL: https://issues.apache.org/jira/browse/KAFKA-16395 Project: Kafka Issue Type: Bug Reporter: David Mao Assignee: David Mao I noticed in a set of producer logs that on a broker outage, we saw the following sequence of logs: Got error produce response with correlation id 1661616 on topic-partition topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error Message: Disconnected from node 0 due to timeout Got error produce response with correlation id 1662093 on topic-partition topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER Received invalid metadata error in produce request on partition topic-0 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now this implies we did not request metadata between our produce request attempts. This is a regression introduced by https://issues.apache.org/jira/browse/KAFKA-14317. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16389: - Assignee: Kirk True (was: Lianet Magrans) > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15899) Move kafka.security package from core to server module
[ https://issues.apache.org/jira/browse/KAFKA-15899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829267#comment-17829267 ] Nikolay Izhikov commented on KAFKA-15899: - Hello [~omnia_h_ibrahim] Can I assign this ticket to myself? > Move kafka.security package from core to server module > -- > > Key: KAFKA-15899 > URL: https://issues.apache.org/jira/browse/KAFKA-15899 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Omnia Ibrahim >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16352) Transaction may get get stuck in PrepareCommit or PrepareAbort state
[ https://issues.apache.org/jira/browse/KAFKA-16352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artem Livshits resolved KAFKA-16352. Fix Version/s: 3.8.0 Reviewer: Justine Olshan Resolution: Fixed > Transaction may get get stuck in PrepareCommit or PrepareAbort state > > > Key: KAFKA-16352 > URL: https://issues.apache.org/jira/browse/KAFKA-16352 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Artem Livshits >Assignee: Artem Livshits >Priority: Major > Fix For: 3.8.0 > > > A transaction took a long time to complete, trying to restart a producer > would lead to CONCURRENT_TRANSACTION errors. Investigation has shown that > the transaction was stuck in PrepareCommit for a few days: > (current time when the investigation happened: Feb 27 2024), transaction > state: > {{Type |Name |Value}} > {{-}} > {{ref |transactionalId |xxx-yyy}} > {{long |producerId |299364}} > {{ref |state |kafka.coordinator.transaction.PrepareCommit$ > @ 0x44fe22760}} > {{long |txnStartTimestamp |1708619624810 Thu Feb 22 2024 16:33:44.810 > GMT+}} > {{long |txnLastUpdateTimestamp|1708619625335 Thu Feb 22 2024 16:33:45.335 > GMT+}} > {{-}} > The partition list was empty and transactionsWithPendingMarkers didn't > contain the reference to the transactional state. In the log there were the > following relevant messages: > {{22 Feb 2024 @ 16:33:45.623 UTC [Transaction State Manager 1]: Completed > loading transaction metadata from __transaction_state-3 for coordinator epoch > 611}} > (this is the partition that contains the transactional id). After the data > is loaded, it sends out markers and etc. > Then there is this message: > {{22 Feb 2024 @ 16:33:45.696 UTC [Transaction Marker Request Completion > Handler 4]: Transaction coordinator epoch for xxx-yyy has changed from 610 to > 611; cancel sending transaction markers TxnMarkerEntry{producerId=299364, > producerEpoch=1005, coordinatorEpoch=610, result=COMMIT, > partitions=[foo-bar]} to the brokers}} > this message is logged just before the state is removed > transactionsWithPendingMarkers, but the state apparently contained the entry > that was created by the load operation. So the sequence of events probably > looked like the following: > # partition load completed > # commit markers were sent for transactional id xxx-yyy; entry in > transactionsWithPendingMarkers was created > # zombie reply from the previous epoch completed, removed entry from > transactionsWithPendingMarkers > # commit markers properly completed, but couldn't transition to > CommitComplete state because transactionsWithPendingMarkers didn't have the > proper entry, so it got stuck there until the broker was restarted > Looking at the code there are a few cases that could lead to similar race > conditions. The fix is to keep track of the PendingCompleteTxn value that > was used when sending the marker, so that we can only remove the state that > was created when the marker was sent and not accidentally remove the state > someone else created. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
Ayoub Omari created KAFKA-16394: --- Summary: ForeignKey LEFT join propagates null value on foreignKey change Key: KAFKA-16394 URL: https://issues.apache.org/jira/browse/KAFKA-16394 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Attachments: ForeignJoinTest.scala, JsonSerde.scala We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple foreign key join on left-topic's foreignKey field which returns the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]
AyoubOm opened a new pull request, #15564: URL: https://github.com/apache/kafka/pull/15564 Added unit tests of two processors included in foreignKey join : `SubscriptionSendProcessorSupplier` and `ForeignTableJoinProcessorSupplier`. Renamed ForeignTableJoinProcessorSupplierTest to SubscriptionJoinProcessorSupplierTest as that's the processor which the test class is testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1532500641 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextHeartbeatRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextHeartbeatRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// Depending on scheduling, the next request could either be BrokerRegistration or BrokerHeartbeat. Review Comment: Before calling `manager.handleKraftJBODMetadataVersionUpdate()`, there should be 1 `CommunicationEvent` with a delay of 100 in `eventQueue`. Until we call `poll`, this `CommunicationEvent` will remain in `eventQueue`. Calling `manager.handleKraftJBODMetadataVersionUpdate()` causes a `KraftJBODMetadataVersionUpdateEvent` with no delay to be added to `eventQueue`. The `KraftJBODMetadataVersionUpdateEvent` will then be processed and replace the `CommunicationEvent` with a delay of 0. The `CommunicationEvent` will then be processed, which causes a `BrokerRegistration` request to be sent. So, it seems that the next request should always be `BrokerRegistration` when we call `poll`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
mimaison commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-2010007861 I don't think (at least don't know how) we can modify the jobs in Jenkins directly. The jobs are automatically created via the Jenkinsfile in this repository. I see some other Apache projects have multiple Jenkinsjob files, it would be good to understand how it works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
mimaison commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1532418821 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -630,8 +631,16 @@ Map describeTopicConfigs(Set topics) Set resources = topics.stream() .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x)) .collect(Collectors.toSet()); -return sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream() -.collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue)); +try { Review Comment: This only improves this single call, there are a bunch of other admin calls runs by the Schedulers that should also be updated. Considering the number of calls, it might make sense to have a helper method to wrap the calls and avoid too much duplication. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -224,6 +229,39 @@ public void testNoBrokerAclAuthorizer() throws Exception { verifyNoInteractions(targetAdmin); } +@Test +public void testMissingDescribeConfigsAcl() throws Exception { +Admin sourceAdmin = mock(Admin.class); +Admin targetAdmin = mock(Admin.class); +MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); +Field configField = connector.getClass().getDeclaredField("config"); Review Comment: Instead of using reflection, I wonder if it would be better to adjust one of the existing constructors used for testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
mimaison commented on code in PR #15483: URL: https://github.com/apache/kafka/pull/15483#discussion_r1532394640 ## clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java: ## @@ -40,15 +48,29 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider va this.time = time; } +/** + * Get the configuration of this metric. + * This is supposed to be used by server only. Review Comment: What do you mean by `This is supposed to be used by server only.`? Same for the other method below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16232: kafka hangs forever in the starting process if the authorizer future is not returned [kafka]
brandboat commented on PR #15549: URL: https://github.com/apache/kafka/pull/15549#issuecomment-2009924617 gentle ping @showuon , would you mind take a look ? Many thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16388: add production-ready test of 2.7, 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString [kafka]
brandboat opened a new pull request, #15563: URL: https://github.com/apache/kafka/pull/15563 Add 2.7, 3.3~3.6 release test to MetadataVersionTest.testFromVersionString ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
C0urante merged PR #15562: URL: https://github.com/apache/kafka/pull/15562 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
C0urante commented on PR #15562: URL: https://github.com/apache/kafka/pull/15562#issuecomment-2009913326 Tests pass locally and this change is trivial; going to merge... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
C0urante commented on code in PR #15562: URL: https://github.com/apache/kafka/pull/15562#discussion_r1532337351 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java: ## @@ -134,6 +134,16 @@ public void testProcessPartitionKeyValidList() { } } +@Test +public void testProcessPartitionKeyNullOffset() { Review Comment: Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-16073: -- Fix Version/s: 3.6.3 (was: 3.6.2) > Kafka Tiered Storage: Consumer Fetch Error Due to Delayed localLogStartOffset > Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16393) SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly
Haruki Okada created KAFKA-16393: Summary: SslTransportLayer doesn't implement write(ByteBuffer[], int, int) correctly Key: KAFKA-16393 URL: https://issues.apache.org/jira/browse/KAFKA-16393 Project: Kafka Issue Type: Improvement Reporter: Haruki Okada As of Kafka 3.7.0, SslTransportLayer.write(ByteBuffer[], int, int) is implemented like below: {code:java} public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { ... int i = offset; while (i < length) { if (srcs[i].hasRemaining() || hasPendingWrites()) { {code} The loop index starts at `offset` and ends with `length`. However this isn't correct because end-index should be `offset + length`. Let's say we have the array of ByteBuffer with length = 5 and try calling this method with offset = 3, length = 1. In current code, `write(srcs, 3, 1)` doesn't attempt any write because the loop condition is immediately false. For now, seems this method is only called with args offset = 0, length = srcs.length in Kafka code base so not causing any problem though, we should fix this because this could introduce subtle bug if use this method with different args in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description
[ https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960 ] Haruki Okada edited comment on KAFKA-16372 at 3/20/24 2:16 PM: --- [~showuon] Agreed. One concern is, IMO many developers expect this "exception thrown on buffer full after max.block.ms"-behavior (because it's stated in Javadoc while we rarely hit buffer-full situation so no one realized this discrepancy). Even some famous open-sources have exception-handling code which doesn't work actually due to this. (e.g. [logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29]) I wonder if just fixing Javadoc and Kafka documentation is fine, or we should make a heads up about this somewhere (e.g. at Kafka user mailing list). I would like to hear committer's opinion. Anyways, meanwhile let me start fixing the docs. was (Author: ocadaruma): [~showuon] Agreed. One concern is, IMO many developers expect this "exception thrown on buffer full after max.block.ms"-behavior (because it's stated in Javadoc while we rarely hit buffer-full situation so no one realized this discrepancy). Even some famous open-sources have exception-handling code which doesn't work actually due to this. (e.g. [logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29]) I wonder if just fixing Javadoc and Kafka documentation is fine, or we should include a heads up about this somewhere (e.g. at Kafka user mailing list). I would like to hear committer's opinion. Anyways, meanwhile let me start fixing the docs. > max.block.ms behavior inconsistency with javadoc and the config description > --- > > Key: KAFKA-16372 > URL: https://issues.apache.org/jira/browse/KAFKA-16372 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Haruki Okada >Priority: Minor > > As of Kafka 3.7.0, the javadoc of > [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956] > states that it throws TimeoutException when max.block.ms is exceeded on > buffer allocation or initial metadata fetch. > Also it's stated in [buffer.memory config > description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory]. > However, I found that this is not true because TimeoutException extends > ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as > FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086] > instead of throwing it. > I wonder if this is a bug or the documentation error. > Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description
[ https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada reassigned KAFKA-16372: Assignee: Haruki Okada > max.block.ms behavior inconsistency with javadoc and the config description > --- > > Key: KAFKA-16372 > URL: https://issues.apache.org/jira/browse/KAFKA-16372 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > As of Kafka 3.7.0, the javadoc of > [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956] > states that it throws TimeoutException when max.block.ms is exceeded on > buffer allocation or initial metadata fetch. > Also it's stated in [buffer.memory config > description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory]. > However, I found that this is not true because TimeoutException extends > ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as > FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086] > instead of throwing it. > I wonder if this is a bug or the documentation error. > Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description
[ https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960 ] Haruki Okada edited comment on KAFKA-16372 at 3/20/24 2:15 PM: --- [~showuon] Agreed. One concern is, IMO many developers expect this "exception thrown on buffer full after max.block.ms"-behavior (because it's stated in Javadoc while we rarely hit buffer-full situation so no one realized this discrepancy). Even some famous open-sources have exception-handling code which doesn't work actually due to this. (e.g. [logback-kafka-appender|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29]) I wonder if just fixing Javadoc and Kafka documentation is fine, or we should include a heads up about this somewhere (e.g. at Kafka user mailing list). I would like to hear committer's opinion. Anyways, meanwhile let me start fixing the docs. was (Author: ocadaruma): [~showuon] Agreed. One concern is, IMO many developers expect this "exception thrown on buffer full after max.block.ms"-behavior (because it's stated in Javadoc while we rarely hit buffer-full situation so no one realized this discrepancy). Even some famous open-sources have exception-handling code which doesn't work actually due to this. (e.g. [logback-kafka-append|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29]) I wonder if just fixing Javadoc and Kafka documentation is fine, or we should include a heads up about this somewhere (e.g. at Kafka user mailing list). I would like to hear committer's opinion. Anyways, meanwhile let me start fixing the docs. > max.block.ms behavior inconsistency with javadoc and the config description > --- > > Key: KAFKA-16372 > URL: https://issues.apache.org/jira/browse/KAFKA-16372 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Haruki Okada >Priority: Minor > > As of Kafka 3.7.0, the javadoc of > [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956] > states that it throws TimeoutException when max.block.ms is exceeded on > buffer allocation or initial metadata fetch. > Also it's stated in [buffer.memory config > description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory]. > However, I found that this is not true because TimeoutException extends > ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as > FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086] > instead of throwing it. > I wonder if this is a bug or the documentation error. > Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16372) max.block.ms behavior inconsistency with javadoc and the config description
[ https://issues.apache.org/jira/browse/KAFKA-16372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828960#comment-17828960 ] Haruki Okada commented on KAFKA-16372: -- [~showuon] Agreed. One concern is, IMO many developers expect this "exception thrown on buffer full after max.block.ms"-behavior (because it's stated in Javadoc while we rarely hit buffer-full situation so no one realized this discrepancy). Even some famous open-sources have exception-handling code which doesn't work actually due to this. (e.g. [logback-kafka-append|https://github.com/danielwegener/logback-kafka-appender/blob/master/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java#L29]) I wonder if just fixing Javadoc and Kafka documentation is fine, or we should include a heads up about this somewhere (e.g. at Kafka user mailing list). I would like to hear committer's opinion. Anyways, meanwhile let me start fixing the docs. > max.block.ms behavior inconsistency with javadoc and the config description > --- > > Key: KAFKA-16372 > URL: https://issues.apache.org/jira/browse/KAFKA-16372 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Haruki Okada >Priority: Minor > > As of Kafka 3.7.0, the javadoc of > [KafkaProducer.send|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L956] > states that it throws TimeoutException when max.block.ms is exceeded on > buffer allocation or initial metadata fetch. > Also it's stated in [buffer.memory config > description|https://kafka.apache.org/37/documentation.html#producerconfigs_buffer.memory]. > However, I found that this is not true because TimeoutException extends > ApiException, and KafkaProducer.doSend catches ApiException and [wraps it as > FutureFailure|https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1075-L1086] > instead of throwing it. > I wonder if this is a bug or the documentation error. > Seems this discrepancy exists since 0.9.0.0, which max.block.ms is introduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [WIP] Splitting consumer tests [kafka]
lianetm closed pull request #15535: [WIP] Splitting consumer tests URL: https://github.com/apache/kafka/pull/15535 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]
chiacyu commented on code in PR #15552: URL: https://github.com/apache/kafka/pull/15552#discussion_r1532121819 ## docs/ops.html: ## @@ -3797,14 +3797,6 @@ Modifying certain dynamic configurations on the standalone KRaft controller - ZooKeeper to KRaft Migration Review Comment: Sure, that makes sense. Thanks for the reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
yashmayya commented on code in PR #15562: URL: https://github.com/apache/kafka/pull/15562#discussion_r1532113435 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java: ## @@ -134,6 +134,16 @@ public void testProcessPartitionKeyValidList() { } } +@Test +public void testProcessPartitionKeyNullOffset() { Review Comment: I think this should be `testProcessPartitionKeyNullPartition` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
C0urante commented on PR #15562: URL: https://github.com/apache/kafka/pull/15562#issuecomment-2009601190 @yashmayya @gharris1727 would either of you have a quick moment for this small patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions [kafka]
C0urante opened a new pull request, #15562: URL: https://github.com/apache/kafka/pull/15562 [Jira](https://issues.apache.org/jira/browse/KAFKA-16392) This is a pretty lightweight change; we wrap the warning log message for unrecognized source partition types in a null guard. One test is added to verify the fix that ensures no log messages are emitted in the affected scenario. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16392) Spurious log warnings: "Ignoring offset partition key with an unexpected format for the second element in the partition key list. Expected type: java.util.Map, actual ty
Chris Egerton created KAFKA-16392: - Summary: Spurious log warnings: "Ignoring offset partition key with an unexpected format for the second element in the partition key list. Expected type: java.util.Map, actual type: null" Key: KAFKA-16392 URL: https://issues.apache.org/jira/browse/KAFKA-16392 Project: Kafka Issue Type: Bug Components: connect Affects Versions: 3.6.1, 3.7.0, 3.5.2, 3.5.1, 3.6.0, 3.5.0, 3.8.0 Reporter: Chris Egerton Assignee: Chris Egerton Some source connectors choose not to specify source offsets with the records they emit (or rather, to provide null partitions/offsets). When these partitions are parsed by a Kafka Connect worker, this currently leads to a spurious warning log message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15951) MissingSourceTopicException should include topic names
[ https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828944#comment-17828944 ] sanghyeok An commented on KAFKA-15951: -- Gently ping, [~mjsax]. When you have free time, could you check comments? :) > MissingSourceTopicException should include topic names > -- > > Key: KAFKA-15951 > URL: https://issues.apache.org/jira/browse/KAFKA-15951 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > As the title say – we don't include topic names in all cases, what make it > hard for users to identify the root cause more clearly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Add retry mechanism to EOS example [kafka]
fvaleri opened a new pull request, #15561: URL: https://github.com/apache/kafka/pull/15561 In the initial EOS example, a retry logic was implemented within the `resetToLastCommittedPositions` method. During refactoring, this logic was removed becasue a poison pill prevented the example from reaching the final phase of consuming from the output topic. In this change, I suggest to add it back, but with a retry limit defined as `MAX_RETRIES`. Once this limit is reached, the problematic batch will be logged and skipped, allowing the processor to move on and process remaining records. If some records are skipped, the example will still hit the hard timeout (2 minutes), but after consuming all processed records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1531939832 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, +true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +// we can't recover from these exceptions +Utils.printErr(e.getMessage()); +shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); +consumer.commitSync(); +} catch (KafkaException e) { +// abort the transaction and try to continue +Utils.printOut("Aborting transaction: %s", e); +
Re: [PR] KAFKA-16381: add lock for KafkaMetric config getter [kafka]
vamossagar12 commented on PR #15550: URL: https://github.com/apache/kafka/pull/15550#issuecomment-2009204011 > So you mean only if the config(MetricConfig config) works, then we need to return the updated config, right? yes, that's correct. Any other thread should see the updated state of the shared variables in the synchronised block only after the lock is released. And come to think of it, that is already being achieved without the synchronised block in the `config()` method. The changes you have added, provide the same guarantees as whatever exist today but at the expense of adding a lock. IF we want to make the value of the `config` object visible immediately to other threads, we could consider making it `volatile` but i am not sure if we really need it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]
FrankYang0529 commented on code in PR #15505: URL: https://github.com/apache/kafka/pull/15505#discussion_r1531943054 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -139,7 +139,7 @@ object StorageTool extends Logging { action(storeTrue()) formatParser.addArgument("--release-version", "-r"). action(store()). - help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}") + help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}") Review Comment: Yes, it looks like we only use `.version()` in few messages, so I remove it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
chia7712 commented on PR #15483: URL: https://github.com/apache/kafka/pull/15483#issuecomment-2009231142 the failed tests are shown below, and they pass on my local ```script ./gradlew cleanTest :tools:test --tests ListConsumerGroupTest.testListConsumerGroupsWithTypesClassicProtocol --tests DescribeConsumerGroupTest.testDescribeStateWithMultiPartitionTopicAndMultipleConsumers :storage:test --tests TransactionsWithTieredStoreTest.testFencingOnAddPartitions --tests TransactionsWithTieredStoreTest.testCommitTransactionTimeout :connect:runtime:test --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets --tests org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector --tests org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testPotentialDeadlockWhenProducingToOffsetsTopic --tests org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector --tests org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testTasksFailOnInabilityToFence --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOf fsetsDifferentKafkaClusterTargeted :metadata:test --tests QuorumControllerTest.testBootstrapZkMigrationRecord :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :server:test --tests ClientMetricsManagerTest.testCacheEviction :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeTopicAutoCreateTopicCreateAcl --tests AuthorizerIntegrationTest.shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL --tests AuthorizerIntegrationTest.testAuthorizeByResourceTypeMultipleAddAndRemove --tests AuthorizerIntegrationTest.testConsumeWithTopicWrite --tests ControllerRegistrationManagerTest.testWrongIncarnationId --tests ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests LogDirFailureTest.testIOExceptionDuringLogRoll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests EagerConsumerCoordinatorTest.testOutdatedCoordinatorAssignment ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
chia7712 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1531889679 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,44 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); +} return CompressionType.NONE; } public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { -// TODO: Support compression in client telemetry. -if (compressionType == CompressionType.NONE) { -return ByteBuffer.wrap(raw); -} else { -throw new UnsupportedOperationException("Compression is not supported"); +try { +try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) { +try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) { +out.write(raw); +out.flush(); +} +compressedOut.buffer().flip(); +return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer())); +} +} catch (IOException e) { +throw new KafkaException("Failed to compress metrics data", e); +} +} + +public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) { +ByteBuffer data = ByteBuffer.wrap(metrics); +try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create()); +ByteArrayOutputStream out = new ByteArrayOutputStream()) { + +byte[] bytes = new byte[data.capacity() * 2]; +int nRead; +while ((nRead = in.read(bytes, 0, bytes.length)) != -1) { +out.write(bytes, 0, nRead); +} + +out.flush(); +return ByteBuffer.wrap(out.toByteArray()); Review Comment: hi @apoorvmittal10 I have a question: Is it worth using `ByteBufferOutputStream` to replace `ByteArrayOutputStream`? We can avoid the array copy by taking buffer from `ByteBufferOutputStream` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]
fvaleri commented on PR #15561: URL: https://github.com/apache/kafka/pull/15561#issuecomment-2009370898 To make it easier to review, I'm adding the output of the new example behavior with 1 partition, 1 processor, max.poll.records = 1, and forcing random exceptions. ```sh $ examples/bin/exactly-once-demo.sh 1 1 10 main - Deleted topics: [input-topic, output-topic] main - Waiting for topics metadata cleanup main - Created topics: [input-topic, output-topic] producer - Sample: record(0, test0), partition(input-topic-0), offset(0) producer - Sample: record(1, test1), partition(input-topic-0), offset(1) producer - Sample: record(2, test2), partition(input-topic-0), offset(2) producer - Sample: record(3, test3), partition(input-topic-0), offset(3) producer - Sample: record(4, test4), partition(input-topic-0), offset(4) producer - Sample: record(5, test5), partition(input-topic-0), offset(5) producer - Sample: record(6, test6), partition(input-topic-0), offset(6) producer - Sample: record(7, test7), partition(input-topic-0), offset(7) producer - Sample: record(8, test8), partition(input-topic-0), offset(8) producer - Sample: record(9, test9), partition(input-topic-0), offset(9) producer - Sent 10 records processor-0 - Processing new records processor-0 - Assigned partitions: [input-topic-0] >>> partition 0, offset 0 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 10 >>> partition 0, offset 0 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 10 >>> partition 0, offset 0 processor-0 - Remaining records: 9 >>> partition 0, offset 1 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 9 >>> partition 0, offset 1 processor-0 - Remaining records: 8 >>> partition 0, offset 2 processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 7 >>> partition 0, offset 3 processor-0 - Aborting transaction: Boom! processor-0 - Skipping record after 5 retries: test3-ok processor-0 - Remaining records: 6 >>> partition 0, offset 4 processor-0 - Remaining records: 5 >>> partition 0, offset 5 processor-0 - Remaining records: 4 >>> partition 0, offset 6 processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 3 >>> partition 0, offset 7 processor-0 - Aborting transaction: Boom! processor-0 - Skipping record after 5 retries: test7-ok processor-0 - Remaining records: 2 >>> partition 0, offset 8 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 2 >>> partition 0, offset 8 processor-0 - Aborting transaction: Boom! processor-0 - Remaining records: 2 >>> partition 0, offset 8 processor-0 - Remaining records: 1 >>> partition 0, offset 9 processor-0 - Remaining records: 0 processor-0 - Revoked partitions: [input-topic-0] processor-0 - Processed 8 records consumer - Subscribed to output-topic consumer - Assigned partitions: [output-topic-0] consumer - Sample: record(0, test0-ok), partition(output-topic-0), offset(3) consumer - Sample: record(1, test1-ok), partition(output-topic-0), offset(7) consumer - Sample: record(2, test2-ok), partition(output-topic-0), offset(9) consumer - Sample: record(4, test4-ok), partition(output-topic-0), offset(23) consumer - Sample: record(5, test5-ok), partition(output-topic-0), offset(25) consumer - Sample: record(6, test6-ok), partition(output-topic-0), offset(27) consumer - Sample: record(8, test8-ok), partition(output-topic-0), offset(45) consumer - Sample: record(9, test9-ok), partition(output-topic-0), offset(47) main - Timeout after 2 minutes waiting for output read consumer - Revoked partitions: [output-topic-0] consumer - Fetched 8 records ``` -- This is an automated message from the Apache Git Service. To respond to the message,
[jira] [Commented] (KAFKA-15736) KRaft support in PlaintextConsumerTest
[ https://issues.apache.org/jira/browse/KAFKA-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828892#comment-17828892 ] Rory commented on KAFKA-15736: -- Hi [~sameert], I think KRaft support is in place for this class, if I am not mistaken? Can this ticket be closed? Thanks > KRaft support in PlaintextConsumerTest > -- > > Key: KAFKA-15736 > URL: https://issues.apache.org/jira/browse/KAFKA-15736 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in PlaintextConsumerTest in > core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala need to > be updated to support KRaft > 49 : def testHeaders(): Unit = { > 136 : def testDeprecatedPollBlocksForAssignment(): Unit = { > 144 : def testHeadersSerializerDeserializer(): Unit = { > 153 : def testMaxPollRecords(): Unit = { > 169 : def testMaxPollIntervalMs(): Unit = { > 194 : def testMaxPollIntervalMsDelayInRevocation(): Unit = { > 234 : def testMaxPollIntervalMsDelayInAssignment(): Unit = { > 258 : def testAutoCommitOnClose(): Unit = { > 281 : def testAutoCommitOnCloseAfterWakeup(): Unit = { > 308 : def testAutoOffsetReset(): Unit = { > 319 : def testGroupConsumption(): Unit = { > 339 : def testPatternSubscription(): Unit = { > 396 : def testSubsequentPatternSubscription(): Unit = { > 447 : def testPatternUnsubscription(): Unit = { > 473 : def testCommitMetadata(): Unit = { > 494 : def testAsyncCommit(): Unit = { > 513 : def testExpandingTopicSubscriptions(): Unit = { > 527 : def testShrinkingTopicSubscriptions(): Unit = { > 541 : def testPartitionsFor(): Unit = { > 551 : def testPartitionsForAutoCreate(): Unit = { > 560 : def testPartitionsForInvalidTopic(): Unit = { > 566 : def testSeek(): Unit = { > 621 : def testPositionAndCommit(): Unit = { > 653 : def testPartitionPauseAndResume(): Unit = { > 671 : def testFetchInvalidOffset(): Unit = { > 696 : def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = { > 717 : def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = { > 743 : def testFetchRecordLargerThanFetchMaxBytes(): Unit = { > 772 : def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = { > 804 : def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit > = { > 811 : def testFetchRecordLargerThanMaxPartitionFetchBytes(): Unit = { > 819 : def testLowMaxFetchSizeForRequestAndPartition(): Unit = { > 867 : def testRoundRobinAssignment(): Unit = { > 903 : def testMultiConsumerRoundRobinAssignor(): Unit = { > 940 : def testMultiConsumerStickyAssignor(): Unit = { > 986 : def testMultiConsumerDefaultAssignor(): Unit = { > 1024 : def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { > 1109 : def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = { > 1141 : def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { > 1146 : def testMultiConsumerSessionTimeoutOnClose(): Unit = { > 1151 : def testInterceptors(): Unit = { > 1210 : def testAutoCommitIntercept(): Unit = { > 1260 : def testInterceptorsWithWrongKeyValue(): Unit = { > 1286 : def testConsumeMessagesWithCreateTime(): Unit = { > 1303 : def testConsumeMessagesWithLogAppendTime(): Unit = { > 1331 : def testListTopics(): Unit = { > 1351 : def testUnsubscribeTopic(): Unit = { > 1367 : def testPauseStateNotPreservedByRebalance(): Unit = { > 1388 : def testCommitSpecifiedOffsets(): Unit = { > 1415 : def testAutoCommitOnRebalance(): Unit = { > 1454 : def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = { > 1493 : def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = { > 1533 : def testPerPartitionLeadMetricsCleanUpWithAssign(): Unit = { > 1562 : def testPerPartitionLagMetricsCleanUpWithAssign(): Unit = { > 1593 : def testPerPartitionLagMetricsWhenReadCommitted(): Unit = { > 1616 : def testPerPartitionLeadWithMaxPollRecords(): Unit = { > 1638 : def testPerPartitionLagWithMaxPollRecords(): Unit = { > 1661 : def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = { > 1809 : def testConsumingWithNullGroupId(): Unit = { > 1874 : def testConsumingWithEmptyGroupId(): Unit = { > 1923 : def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = > { > Scanned 1951 lines. Found 0 KRaft tests out of 61 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683586 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + // If the manager is idling until scheduled events we need to advance the clock + if (manager.eventQueue.scheduledAfterIdling() +.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid triggering timeout events Review Comment: That's a good idea. I'm making that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683014 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// We may have to accept some heartbeats before the new registration is sent +while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])() Review Comment: No, I don't think so. `prepareResponse` delegates to `MockClient` which expects a predetermined request-response order. It supports a `RequestMatcher` which `prepareResponse` uses to extract the request, but it does not support preparing a conditional response. We need a larger change to add support in `MockClient` for conditional prepared responses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Tuple2 replaced with Map.Entry [kafka]
nizhikov commented on PR #15560: URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009033688 Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]
chia7712 commented on PR #15560: URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009028276 @nizhikov This is unrelated to KAFKA-14589, so please use "MINOR:" instead of "KAFKA-14589" :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]
nizhikov commented on PR #15560: URL: https://github.com/apache/kafka/pull/15560#issuecomment-2009012349 This PR is follow-up for #14471 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-2009012039 @chia7712 Please, take a look - https://github.com/apache/kafka/pull/15560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14589 Tuple2 replaced with Map.Entry [kafka]
nizhikov opened a new pull request, #15560: URL: https://github.com/apache/kafka/pull/15560 `Tuple2` replaced with SDK `Map.Entry` and `SimleImmutableEntry`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached
[ https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828654#comment-17828654 ] Kuan Po Tseng commented on KAFKA-16385: --- {quote} [~brandboat] , are you clear what you should do for this ticket? Please let us know if you have any question. {quote} Thanks, I'm still poking around the source code, but sounds like we should document the behavior mentioned in this JIRA ticket. If I have any questions, I'll consult with you all again. Huge thanks ! > Segment is rolled before segment.ms or segment.bytes breached > - > > Key: KAFKA-16385 > URL: https://issues.apache.org/jira/browse/KAFKA-16385 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.1, 3.7.0 >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Major > > Steps to reproduce: > 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up > the test. > 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, > retention.ms=1sec . > 2. Send a record "aaa" to the topic > 3. Wait for 1 second > Will this segment will rolled? I thought no. > But what I have tested is it will roll: > {code:java} > [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, > dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. > (kafka.log.LocalLog) > [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote > producer snapshot at offset 1 with 1 producer ids in 1 ms. > (org.apache.kafka.storage.internals.log.ProducerStateManager) > [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, > dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, > lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to > log retention time 1000ms breach based on the largest record timestamp in the > segment (kafka.log.UnifiedLog) > {code} > The segment is rolled due to log retention time 1000ms breached, which is > unexpected. > Tested in v3.5.1, it has the same issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15989) Upgrade existing generic group to consumer group
[ https://issues.apache.org/jira/browse/KAFKA-15989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15989. - Fix Version/s: 3.8.0 Resolution: Fixed > Upgrade existing generic group to consumer group > > > Key: KAFKA-15989 > URL: https://issues.apache.org/jira/browse/KAFKA-15989 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Assignee: David Jacot >Priority: Minor > Fix For: 3.8.0 > > > It should be possible to upgrade an existing generic group to a new consumer > group, in case it was using either the previous generic protocol or manual > partition assignment and commit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15763) Group Coordinator should not deliver new assignment before previous one is acknowledged
[ https://issues.apache.org/jira/browse/KAFKA-15763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15763. - Resolution: Won't Fix We went with another approach. > Group Coordinator should not deliver new assignment before previous one is > acknowledged > --- > > Key: KAFKA-15763 > URL: https://issues.apache.org/jira/browse/KAFKA-15763 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > In the initial implementation of the new consumer group protocol, the group > coordinators waits on received an acknowledgement from the consumer only when > there are partitions to be revoked. In the case of newly assigned partitions, > a new assignment can be delivered any time (e.g. in two subsequent > heartbeats). > While implementing the state machine on the client side, we found out that > this caused confusion because the protocol does not treat revocation and > assignment in the same way. We also found out that changing the assignment > before the previous one is fully processed by the member makes the client > side logic more complicated than it should be because the consumer can't > process any new assignment until it has completed the previous one. > In the end, it is better to change the server side to not deliver a new > assignment before the current one is acknowledged by the consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16313) Offline group protocol migration
[ https://issues.apache.org/jira/browse/KAFKA-16313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16313. - Fix Version/s: 3.8.0 Assignee: Dongnuo Lyu Resolution: Fixed > Offline group protocol migration > > > Key: KAFKA-16313 > URL: https://issues.apache.org/jira/browse/KAFKA-16313 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]
dajac merged PR #15546: URL: https://github.com/apache/kafka/pull/15546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
chia7712 commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008956850 > The only issue I can see is naming. Map.Entry has getKey, getValue for first and second values in pair which sets some kind of relation between values for me. While Tuple2 just stores two values(v1, v2) that sits together in some piece of code. Should we go with getKey, getValue for tuple values? Is it clear naming? WDYT? My point was that we can leverage the JAVA API instead of creating new one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008956417 @chia7712 @jolshan @mimaison @tledkov Guys, thank you for a help with this PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
chia7712 merged PR #14471: URL: https://github.com/apache/kafka/pull/14471 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008931321 @chia7712 I tried to perform Tuple2 -> Map.Entry substitution. The only issue I can see is naming. `Map.Entry` has `getKey`, `getValue` for first and second values in pair which sets some kind of relation between values for me. While Tuple2 just stores two values(v1, v2) that sits together in some piece of code. Should we go with `getKey`, `getValue` for tuple values? Is it clear naming? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]
nizhikov commented on PR #15501: URL: https://github.com/apache/kafka/pull/15501#issuecomment-2008920265 @OmniaGM > Start doing this but will be continue this after the Kafka Summit London. Great! Don't hesitate to ask for assistance if you need any help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]
showuon commented on PR #15481: URL: https://github.com/apache/kafka/pull/15481#issuecomment-2008904600 Backedport to 3.7 and 3.6. @omkreddy , FYI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16222. --- Resolution: Fixed > KRaft Migration: Incorrect default user-principal quota after migration > --- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.7.0, 3.6.1 >Reporter: Dominik >Assignee: PoAn Yang >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
chia7712 commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008792236 @nizhikov thanks for all efforts! Please file PR to address follow-up, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]
showuon merged PR #15481: URL: https://github.com/apache/kafka/pull/15481 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]
showuon commented on PR #15481: URL: https://github.com/apache/kafka/pull/15481#issuecomment-2008777656 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached
[ https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828620#comment-17828620 ] Luke Chen commented on KAFKA-16385: --- Thanks for the response Jun, and thanks for the summary, Chia-Ping. [~brandboat] , are you clear what you should do for this ticket? Please let us know if you have any question. > Segment is rolled before segment.ms or segment.bytes breached > - > > Key: KAFKA-16385 > URL: https://issues.apache.org/jira/browse/KAFKA-16385 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.1, 3.7.0 >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Major > > Steps to reproduce: > 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up > the test. > 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, > retention.ms=1sec . > 2. Send a record "aaa" to the topic > 3. Wait for 1 second > Will this segment will rolled? I thought no. > But what I have tested is it will roll: > {code:java} > [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, > dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. > (kafka.log.LocalLog) > [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote > producer snapshot at offset 1 with 1 producer ids in 1 ms. > (org.apache.kafka.storage.internals.log.ProducerStateManager) > [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, > dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, > lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to > log retention time 1000ms breach based on the largest record timestamp in the > segment (kafka.log.UnifiedLog) > {code} > The segment is rolled due to log retention time 1000ms breached, which is > unexpected. > Tested in v3.5.1, it has the same issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Update upgrade docs to refer 3.6.2 version [kafka]
omkreddy closed pull request #15554: MINOR: Update upgrade docs to refer 3.6.2 version URL: https://github.com/apache/kafka/pull/15554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org