[jira] [Created] (KAFKA-16582) Feature Request: Introduce max.record.size Configuration Parameter for Producers

2024-04-17 Thread Ramiz Mehran (Jira)
Ramiz Mehran created KAFKA-16582:


 Summary: Feature Request: Introduce max.record.size Configuration 
Parameter for Producers
 Key: KAFKA-16582
 URL: https://issues.apache.org/jira/browse/KAFKA-16582
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 3.6.2
Reporter: Ramiz Mehran


{*}Summary{*}:

Currently, Kafka producers have a {{max.request.size}} configuration that 
limits the size of the request sent to Kafka brokers, which includes both 
compressed and uncompressed data sizes. However, it is also the maximum size of 
an individual record before it is compressed. This can lead to inefficiencies 
and unexpected behaviours, particularly when records are significantly large 
before compression but fit multiple times into the {{max.request.size}} after 
compression.

{*}Problem{*}:

During spikes in data transmission, especially with large records, even when 
compressed within the limits of {{{}max.request.size{}}}, it causes an increase 
in latency and potential backlog in processing due to the large batch sizes 
formed by compressed records. This problem is particularly pronounced when 
using highly efficient compression algorithms like zstd, where the compressed 
size may allow for large batches that are inefficient to process.

{*}Proposed Solution{*}:

Introduce a new producer configuration parameter: {{{}max.record.size{}}}. This 
parameter will allow administrators to define the maximum size of a record 
before it is compressed. This would help in managing expectations and system 
behavior more predictably by separating uncompressed record size limit from 
compressed request size limit.

{*}Benefits{*}:
 # {*}Predictability{*}: Producers can reject records that exceed the 
{{max.record.size}} before spending resources on compression.
 # {*}Efficiency{*}: Helps in maintaining efficient batch sizes and system 
throughput, especially under high load conditions.
 # {*}System Stability{*}: Avoids the potential for large batch processing 
which can affect latency and throughput negatively.

{*}Example{*}: Consider a scenario where the producer sends records up to 20 MB 
in size which, when compressed, fit into a batch under the 25 MB 
{{max.request.size }}multiple times. These batches can be problematic to 
process efficiently, even though they meet the current maximum request size 
constraints. With {{{}max.record.size{}}}, we could separate max.request.size 
to only limit compressed request size creation, thus helping us limit that to 
say 5 MB. Thus, preventing very large requests being made, and causing latency 
spikes.

{*}Steps to Reproduce{*}:
 # Configure a Kafka producer with {{max.request.size}} set to 25 MB.
 # Send multiple uncompressed records close to 20 MB that compress to less than 
25 MB.
 # Observe the impact on Kafka broker performance and client side latency.

{*}Expected Behavior{*}: The producer should allow administrators to set both 
pre-compression record size limits and total request size limits post 
compression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs

2024-04-17 Thread Ramiz Mehran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838450#comment-17838450
 ] 

Ramiz Mehran commented on KAFKA-10034:
--

"Firstly, this configuration is a cap on the maximum uncompressed record batch 
size." should be changed to "Firstly, this configuration is a cap on the 
maximum uncompressed record size."

As the cap is on uncompressed single record's max size.

> Clarify Usage of "batch.size" and "max.request.size" Producer Configs
> -
>
> Key: KAFKA-10034
> URL: https://issues.apache.org/jira/browse/KAFKA-10034
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, producer 
>Reporter: Mark Cox
>Assignee: Badai Aqrandista
>Priority: Minor
>
> The documentation around the producer configurations "batch.size" and 
> "max.request.size", and how they relate to one another, can be confusing.
> In reality, the "max.request.size" is a hard limit on each individual record, 
> but the documentation makes it seem this is the maximum size of a request 
> sent to Kafka.  If there is a situation where "batch.size" is set greater 
> than "max.request.size" (and each individual record is smaller than 
> "max.request.size") you could end up with larger requests than expected sent 
> to Kafka.
> There are a few things that could be considered to make this clearer:
>  # Improve the documentation to clarify the two producer configurations and 
> how they relate to each other
>  # Provide a producer check, and possibly a warning, if "batch.size" is found 
> to be greater than "max.request.size"
>  # The producer could take the _minimum_ of "batch.size" or "max.request.size"
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs

2024-04-17 Thread Ramiz Mehran (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-10034 ]


Ramiz Mehran deleted comment on KAFKA-10034:
--

was (Author: JIRAUSER290918):
"Firstly, this configuration is a cap on the maximum uncompressed record batch 
size." should be changed to "Firstly, this configuration is a cap on the 
maximum uncompressed record size."

> Clarify Usage of "batch.size" and "max.request.size" Producer Configs
> -
>
> Key: KAFKA-10034
> URL: https://issues.apache.org/jira/browse/KAFKA-10034
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, producer 
>Reporter: Mark Cox
>Assignee: Badai Aqrandista
>Priority: Minor
>
> The documentation around the producer configurations "batch.size" and 
> "max.request.size", and how they relate to one another, can be confusing.
> In reality, the "max.request.size" is a hard limit on each individual record, 
> but the documentation makes it seem this is the maximum size of a request 
> sent to Kafka.  If there is a situation where "batch.size" is set greater 
> than "max.request.size" (and each individual record is smaller than 
> "max.request.size") you could end up with larger requests than expected sent 
> to Kafka.
> There are a few things that could be considered to make this clearer:
>  # Improve the documentation to clarify the two producer configurations and 
> how they relate to each other
>  # Provide a producer check, and possibly a warning, if "batch.size" is found 
> to be greater than "max.request.size"
>  # The producer could take the _minimum_ of "batch.size" or "max.request.size"
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs

2024-04-17 Thread Ramiz Mehran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838447#comment-17838447
 ] 

Ramiz Mehran commented on KAFKA-10034:
--

"Firstly, this configuration is a cap on the maximum uncompressed record batch 
size." should be changed to "Firstly, this configuration is a cap on the 
maximum uncompressed record size."

> Clarify Usage of "batch.size" and "max.request.size" Producer Configs
> -
>
> Key: KAFKA-10034
> URL: https://issues.apache.org/jira/browse/KAFKA-10034
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, producer 
>Reporter: Mark Cox
>Assignee: Badai Aqrandista
>Priority: Minor
>
> The documentation around the producer configurations "batch.size" and 
> "max.request.size", and how they relate to one another, can be confusing.
> In reality, the "max.request.size" is a hard limit on each individual record, 
> but the documentation makes it seem this is the maximum size of a request 
> sent to Kafka.  If there is a situation where "batch.size" is set greater 
> than "max.request.size" (and each individual record is smaller than 
> "max.request.size") you could end up with larger requests than expected sent 
> to Kafka.
> There are a few things that could be considered to make this clearer:
>  # Improve the documentation to clarify the two producer configurations and 
> how they relate to each other
>  # Provide a producer check, and possibly a warning, if "batch.size" is found 
> to be greater than "max.request.size"
>  # The producer could take the _minimum_ of "batch.size" or "max.request.size"
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-17 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2062936947

   @showuon , just checking did you get a chance to look at the updated test? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16467) Add README to docs folder

2024-04-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838438#comment-17838438
 ] 

ASF GitHub Bot commented on KAFKA-16467:


showuon merged PR #596:
URL: https://github.com/apache/kafka-site/pull/596




> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2062889530

   @FrankYang0529 , there is checkstyle error:
   `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34:
 Name 'futureDirPattern' must match pattern 
'(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName]
   `
   
   Please help fix it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1569846240


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


showuon merged PR #15747:
URL: https://github.com/apache/kafka/pull/15747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


showuon commented on PR #15747:
URL: https://github.com/apache/kafka/pull/15747#issuecomment-2062884304

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


showuon commented on PR #15748:
URL: https://github.com/apache/kafka/pull/15748#issuecomment-2062882937

   Looks like `LogCleanerParameterizedIntegrationTest` failed due to this 
change. Please take a look. Thanks.
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15748/1/#showFailuresLink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-17 Thread via GitHub


aaron-ai commented on code in PR #15728:
URL: https://github.com/apache/kafka/pull/15728#discussion_r1569813058


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) {
 result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
 result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + 
CONSUMER_CLIENT_PREFIX));
 result.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

Review Comment:
   @C0urante OK, I will draft a KIP later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration

2024-04-17 Thread Chia Chuan Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838420#comment-17838420
 ] 

Chia Chuan Yu commented on KAFKA-16574:
---

Hi, [~chia7712] 

Can I have this one ? thanks!

> The metrics of LogCleaner disappear after reconfiguration
> -
>
> Key: KAFKA-16574
> URL: https://issues.apache.org/jira/browse/KAFKA-16574
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see 
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227]
> We don't rebuild the metrics after calling shutdown. The following test can 
> prove that.
> {code:java}
> @Test
> def testMetricsAfterReconfiguration(): Unit = {
>   val logCleaner = new LogCleaner(new CleanerConfig(true),
> logDirs = Array(TestUtils.tempDir()),
> logs = new Pool[TopicPartition, UnifiedLog](),
> logDirFailureChannel = new LogDirFailureChannel(1),
> time = time)
>   def check(): Unit =
> LogCleaner.MetricNames.foreach(name => 
> assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup
>   .metricName(name, java.util.Collections.emptyMap())), s"$name is 
> gone?"))
>   try {
> check()
> logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, 
> "localhost:2181")),
>   new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")))
> check()
>   } finally logCleaner.shutdown()
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]

2024-04-17 Thread via GitHub


TaiJuWu commented on code in PR #15749:
URL: https://github.com/apache/kafka/pull/15749#discussion_r1569739083


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1082,7 +1082,7 @@ private[kafka] class Processor(
   // `protected` for test usage
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send): Unit = {
 val connectionId = response.request.context.connectionId
-trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
+trace(s"Socket server received response from $connectionId, registering 
for write and sending data: $response")

Review Comment:
   Oh, you’re right. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]

2024-04-17 Thread via GitHub


TaiJuWu closed pull request #15749: MINOR: Modify SocketServer#sendResponse 
trace msg
URL: https://github.com/apache/kafka/pull/15749


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16577:
--
Description: 
The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   2 minutes 35.925 seconds


AssertionError('Node ubuntu@worker5: did not stop within the specified 
timeout of 30 seconds')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 176, in test_consumer_bounce
self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 39, in rolling_bounce_consumers
consumer.stop_node(node, clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 419, in stop_node
(str(node.account), str(self.stop_timeout_sec))
AssertionError: Node ubuntu@worker5: did not stop within the specified timeout 
of 30 seconds
{code}

Affected tests:
 * {{test_broker_failure}}
 * {{test_consumer_bounce}}

  was:
The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   2 minutes 35.925 seconds


AssertionError('Node ubuntu@worker5: did not stop within the specified 
timeout of 30 seconds')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 176, in test_consumer_bounce
self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 39, in rolling_bounce_consumers
consumer.stop_node(node, clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 419, in stop_node
(str(node.account), str(self.stop_timeout_sec))
AssertionError: Node ubuntu@worker5: did not stop within the specified timeout 
of 30 seconds
{code}

Affected tests:
 * {{test_consumer_bounce}}


> New consumer fails with stop within allotted timeout in consumer_test.py 
> system test
> 
>
> Key: KAFKA-16577
> URL: https://issues.apache.org/jira/browse/KAFKA-16577
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> 

[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16577:
--
Summary: New consumer fails with stop within allotted timeout in 
consumer_test.py system test  (was: New consumer fails with stop within 
allotted timeout in consumer_test.py’s test_consumer_bounce system test)

> New consumer fails with stop within allotted timeout in consumer_test.py 
> system test
> 
>
> Key: KAFKA-16577
> URL: https://issues.apache.org/jira/browse/KAFKA-16577
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   2 minutes 35.925 seconds
> AssertionError('Node ubuntu@worker5: did not stop within the specified 
> timeout of 30 seconds')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 176, in test_consumer_bounce
> self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 39, in rolling_bounce_consumers
> consumer.stop_node(node, clean_shutdown)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 419, in stop_node
> (str(node.account), str(self.stop_timeout_sec))
> AssertionError: Node ubuntu@worker5: did not stop within the specified 
> timeout of 30 seconds
> {code}
> Affected tests:
>  * {{test_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-17 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2062753494

   I noticed I need to do Quorum and Broker features which are basically the 
same implementation. Stay tuned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16581) Implement KafkaRaftClient unittest for membership change

2024-04-17 Thread Jira
José Armando García Sancio created KAFKA-16581:
--

 Summary: Implement KafkaRaftClient unittest for membership change
 Key: KAFKA-16581
 URL: https://issues.apache.org/jira/browse/KAFKA-16581
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
 Fix For: 3.8.0


Add a new test suite like KafkaRaftClientTest and KafkaRaftClientSnapshotTest 
that validates the implementation.

In addition KafkaRaftClientTest and KafkaRaftClientSnapshotTest should work 
with all versions for the RPCs.

Test upgrades from kraft.version 0 to 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16580) Write simulation tests for kraft membership change

2024-04-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16580:
---
Description: Update RaftEventSimulationTest to validate kraft.version 1 and 
changes made for membership change.

> Write simulation tests for kraft membership change
> --
>
> Key: KAFKA-16580
> URL: https://issues.apache.org/jira/browse/KAFKA-16580
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> Update RaftEventSimulationTest to validate kraft.version 1 and changes made 
> for membership change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16580) Write simulation tests for kraft membership change

2024-04-17 Thread Jira
José Armando García Sancio created KAFKA-16580:
--

 Summary: Write simulation tests for kraft membership change
 Key: KAFKA-16580
 URL: https://issues.apache.org/jira/browse/KAFKA-16580
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes

2024-04-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Fix Version/s: 3.8.0

> KIP-853: KRaft controller membership changes
> 
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: 4.0-blocker
> Fix For: 3.8.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes

2024-04-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Labels:   (was: 4.0-blocker)

> KIP-853: KRaft controller membership changes
> 
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-04-17 Thread Fu Qiao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838402#comment-17838402
 ] 

Fu Qiao commented on KAFKA-16557:
-

Created a PR for this if you don't mind: 
[https://github.com/apache/kafka/pull/15750]
Thanks

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-17 Thread via GitHub


phooq opened a new pull request, #15750:
URL: https://github.com/apache/kafka/pull/15750

   ### Description
   This PR is for 
[KAFKA-16557](https://issues.apache.org/jira/browse/KAFKA-16557). This is to 
enhance the debugging and troubleshooting for consumer related issues.
   
   ### Test
   
   `./gradew jar`
   
   ```
   BUILD SUCCESSFUL in 3m 16s
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16579:
--
Description: 
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so 
it could test the new consumer. However, the test is tailored specifically to 
the "old" Consumer's protocol and assignment strategy upgrade.

Unsurprisingly, when we run those system tests with the new 
{{AsyncKafkaConsumer}}, we get errors like the following:

{code}
test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   29.634 seconds


AssertionError("Mismatched assignment: {frozenset(), 
frozenset({TopicPartition(topic='test_topic', partition=0), 
TopicPartition(topic='test_topic', partition=1)})}")
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
 line 77, in rolling_update_test
self._verify_range_assignment(consumer)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
 line 41, in _verify_range_assignment
"Mismatched assignment: %s" % assignment
AssertionError: Mismatched assignment: {frozenset(), 
frozenset({TopicPartition(topic='test_topic', partition=0), 
TopicPartition(topic='test_topic', partition=1)})}
{code}

The task here is to revert the changes made in KAFKA-16271.

  was:
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so 
it could test the new consumer. However, the test is tailored specifically to 
the "old" Consumer's protocol and assignment strategy upgrade.

Unsurprisingly, when we run those system tests with the new 
{{AsyncKafkaConsumer}}, we get errors like the following:

{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 

[jira] [Updated] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16579:
--
Description: 
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} so 
it could test the new consumer. However, the test is tailored specifically to 
the "old" Consumer's protocol and assignment strategy upgrade.

Unsurprisingly, when we run those system tests with the new 
{{AsyncKafkaConsumer}}, we get errors like the following:

{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}
The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].

  was:
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",

[jira] [Updated] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16578:
--
Description: 
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{AsyncKafkaConsumer}}, we get errors like the 
following:

{code}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}

The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].

  was:
To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run

[jira] [Created] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16579:
-

 Summary: Revert changes to consumer_rolling_upgrade_test.py for 
the new async Consumer
 Key: KAFKA-16579
 URL: https://issues.apache.org/jira/browse/KAFKA-16579
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}
The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16579:
-

Assignee: Philip Nee  (was: Kirk True)

> Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
> -
>
> Key: KAFKA-16579
> URL: https://issues.apache.org/jira/browse/KAFKA-16579
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
> could test the new consumer with Connect. However, we are not supporting 
> Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the 
> Connect system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors 
> like the following:
> {code:java}
> test_id:
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   6 minutes 3.899 seconds
> InsufficientResourcesError('Not enough nodes available to allocate. linux 
> nodes requested: 1. linux nodes available: 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 919, in test_exactly_once_source
> consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
> self.source.topic, consumer_timeout_ms=1000, print_key=True)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
>  line 97, in __init__
> BackgroundThreadService.__init__(self, context, num_nodes)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 26, in __init__
> super(BackgroundThreadService, self).__init__(context, num_nodes, 
> cluster_spec, *args, **kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
>  line 107, in __init__
> self.allocate_nodes()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
>  line 217, in allocate_nodes
> self.nodes = self.cluster.alloc(self.cluster_spec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
>  line 54, in alloc
> allocated = self.do_alloc(cluster_spec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
>  line 31, in do_alloc
> allocated = self._available_nodes.remove_spec(cluster_spec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
>  line 117, in remove_spec
> raise InsufficientResourcesError("Not enough nodes available to allocate. 
> " + msg)
> ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
> available to allocate. linux nodes requested: 1. linux nodes available: 0
> {code}
> The task here is to revert the changes made in KAFKA-16272 [PR 
> 15576|https://github.com/apache/kafka/pull/15576].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16578:
-

 Summary: Revert changes to connect_distributed_test.py for the new 
async Consumer
 Key: KAFKA-16578
 URL: https://issues.apache.org/jira/browse/KAFKA-16578
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated a 
slew of system tests to run both the "old" and "new" implementations. 
KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
could test the new consumer with Connect. However, we are not supporting 
Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the Connect 
system tests with the new {{{}AsyncKafkaConsumer{}}}, we get errors like the 
following:
{code:java}
test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   6 minutes 3.899 seconds


InsufficientResourcesError('Not enough nodes available to allocate. linux 
nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 919, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/console_consumer.py",
 line 97, in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 107, in __init__
self.allocate_nodes()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/service.py",
 line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/cluster.py",
 line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/finite_subcluster.py",
 line 31, in do_alloc
allocated = self._available_nodes.remove_spec(cluster_spec)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/cluster/node_container.py",
 line 117, in remove_spec
raise InsufficientResourcesError("Not enough nodes available to allocate. " 
+ msg)
ducktape.cluster.node_container.InsufficientResourcesError: Not enough nodes 
available to allocate. linux nodes requested: 1. linux nodes available: 0
{code}
The task here is to revert the changes made in KAFKA-16272 [PR 
15576|https://github.com/apache/kafka/pull/15576].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16576:
--
Labels: flaky-test kip-848-client-support system-tests  (was: 
kip-848-client-support system-tests)

> New consumer fails with assert in consumer_test.py’s test_consumer_failure 
> system test
> --
>
> Key: KAFKA-16576
> URL: https://issues.apache.org/jira/browse/KAFKA-16576
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16577:
--
Labels: flaky-test kip-848-client-support system-tests  (was: 
kip-848-client-support system-tests)

> New consumer fails with stop within allotted timeout in consumer_test.py’s 
> test_consumer_bounce system test
> ---
>
> Key: KAFKA-16577
> URL: https://issues.apache.org/jira/browse/KAFKA-16577
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: flaky-test, kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   2 minutes 35.925 seconds
> AssertionError('Node ubuntu@worker5: did not stop within the specified 
> timeout of 30 seconds')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 176, in test_consumer_bounce
> self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 39, in rolling_bounce_consumers
> consumer.stop_node(node, clean_shutdown)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 419, in stop_node
> (str(node.account), str(self.stop_timeout_sec))
> AssertionError: Node ubuntu@worker5: did not stop within the specified 
> timeout of 30 seconds
> {code}
> Affected tests:
>  * {{test_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16577:
--
Description: 
The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   2 minutes 35.925 seconds


AssertionError('Node ubuntu@worker5: did not stop within the specified 
timeout of 30 seconds')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 176, in test_consumer_bounce
self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 39, in rolling_bounce_consumers
consumer.stop_node(node, clean_shutdown)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
 line 419, in stop_node
(str(node.account), str(self.stop_timeout_sec))
AssertionError: Node ubuntu@worker5: did not stop within the specified timeout 
of 30 seconds
{code}

Affected tests:
 * {{test_consumer_bounce}}

  was:
The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   42.582 seconds


AssertionError()
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 399, in test_consumer_failure
assert partition_owner is not None
AssertionError
Notify
{code}

Affected tests:
 * {{test_consumer_failure}}


> New consumer fails with stop within allotted timeout in consumer_test.py’s 
> test_consumer_bounce system test
> ---
>
> Key: KAFKA-16577
> URL: https://issues.apache.org/jira/browse/KAFKA-16577
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   2 minutes 35.925 seconds
> AssertionError('Node ubuntu@worker5: did not stop within the specified 
> timeout of 30 seconds')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return 

[jira] [Updated] (KAFKA-16577) New consumer fails with stop within allotted timeout in consumer_test.py’s test_consumer_bounce system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16577:
--
Summary: New consumer fails with stop within allotted timeout in 
consumer_test.py’s test_consumer_bounce system test  (was: New consumer fails 
with stop timeout in consumer_test.py’s test_consumer_bounce system test)

> New consumer fails with stop within allotted timeout in consumer_test.py’s 
> test_consumer_bounce system test
> ---
>
> Key: KAFKA-16577
> URL: https://issues.apache.org/jira/browse/KAFKA-16577
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16577) New consumer fails with stop timeout in consumer_test.py’s test_consumer_bounce system test

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16577:
-

 Summary: New consumer fails with stop timeout in 
consumer_test.py’s test_consumer_bounce system test
 Key: KAFKA-16577
 URL: https://issues.apache.org/jira/browse/KAFKA-16577
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   42.582 seconds


AssertionError()
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 399, in test_consumer_failure
assert partition_owner is not None
AssertionError
Notify
{code}

Affected tests:
 * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16576:
--
Description: 
The {{consumer_test.py}} system test intermittently fails with the following 
error:

{code}
test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   42.582 seconds


AssertionError()
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 399, in test_consumer_failure
assert partition_owner is not None
AssertionError
Notify
{code}

Affected tests:
 * {{test_consumer_failure}}

  was:
The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}


> New consumer fails with assert in consumer_test.py’s test_consumer_failure 
> system test
> --
>
> Key: KAFKA-16576
> URL: https://issues.apache.org/jira/browse/KAFKA-16576
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test intermittently fails with the following 
> error:
> {code}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   42.582 seconds
> AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 399, in test_consumer_failure
> assert partition_owner is not None
> AssertionError
> Notify
> {code}
> Affected tests:
>  * {{test_consumer_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test

2024-04-17 Thread Kirk True (Jira)
Kirk True created KAFKA-16576:
-

 Summary: New consumer fails with assert in consumer_test.py’s 
test_consumer_failure system test
 Key: KAFKA-16576
 URL: https://issues.apache.org/jira/browse/KAFKA-16576
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The {{consumer_test.py}} system test fails with the following errors:

{quote}
* Timed out waiting for consumption
{quote}

Affected tests:

* {{test_broker_failure}}
* {{test_consumer_bounce}}
* {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-04-17 Thread Fu Qiao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838384#comment-17838384
 ] 

Fu Qiao commented on KAFKA-16557:
-

Hey [~kirktrue] : I am new to Kafka, and would like to start with some tasks. 
Would you mind if I pick this up? 
Thanks

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]

2024-04-17 Thread via GitHub


splett2 commented on code in PR #15749:
URL: https://github.com/apache/kafka/pull/15749#discussion_r1569578496


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1082,7 +1082,7 @@ private[kafka] class Processor(
   // `protected` for test usage
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send): Unit = {
 val connectionId = response.request.context.connectionId
-trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
+trace(s"Socket server received response from $connectionId, registering 
for write and sending data: $response")

Review Comment:
   why is the suggested comment more correct? we are sending the response _to_ 
the connection, we are not receiving a response from the connection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569562746


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   @kamalcph : Thanks for the explanation. I understand the problem now.
   
   As for the fix, it seems that it could work for HWM. However, I am not sure 
that we could always do the same thing of LastStableOffset. For example, if we 
lose the local data in all replicas, the lastStableOffset could still be in the 
middle of a tiered segment and moving it to localLogStartOffset immediately 
will be incorrect. 
   
   Here is another potential approach. Note that OffsetMetadata 
(segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch 
for estimating the amount of available bytes. If occasionally OffsetMetadata is 
not available, we don't have to force an exception in 
convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as 
empty and just use a conservative 1 byte for estimating the amount of available 
bytes. This approach will apply to both HWM and LSO.  The inaccurate byte 
estimate will be ok as long as it's infrequent. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-17 Thread via GitHub


philipnee commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1569554430


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -98,15 +98,15 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
 return EMPTY;
 
 if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
-NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest();
 return new NetworkClientDelegate.PollResult(request);
 }
 
 return new 
NetworkClientDelegate.PollResult(coordinatorRequestState.remainingBackoffMs(currentTimeMs));
 }
 
-NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long 
currentTimeMs) {
-coordinatorRequestState.onSendAttempt(currentTimeMs);
+NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest() {
+coordinatorRequestState.onSendAttempt();

Review Comment:
   Thanks. This will fix some of the no-backoff issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Modify SocketServer#sendResponse trace msg [kafka]

2024-04-17 Thread via GitHub


TaiJuWu opened a new pull request, #15749:
URL: https://github.com/apache/kafka/pull/15749

   As title.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2062253184

   Thanks for the comments @kirktrue, all addressed. 
   
   Regarding your comment regarding tests 
[here](https://github.com/apache/kafka/pull/15698#pullrequestreview-2004676007),
 we have that covered with the existing `testHeartbeatResponseOnErrorHandling`. 
That one is validating that we get the right `nextHeartbeatMs` time (which 
considers the timer), for each specific error type. Is that what you were 
looking for? Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569511540


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,35 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOneInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");

Review Comment:
   Fixed (not nit-picky at all, ugly typo, good catch!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]

2024-04-17 Thread via GitHub


jolshan commented on PR #15739:
URL: https://github.com/apache/kafka/pull/15739#issuecomment-2062215298

   I ran the test 3 times and it seems to be working ok. I can run a few more 
times if we want to confirm the flakiness is gone.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569504137


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Reset timer to allow sending HB after a failure without waiting 
for the interval.
+// After a failure, a next HB may be needed with backoff (ex. 
errors that lead to
+// retries, like coordinator load error), or immediately (ex. 
errors that lead to
+// rejoining, like fencing errors).
+heartbeatTimer.reset(0);
+super.onFailedAttempt(currentTimeMs);

Review Comment:
   no, it doesn't. The timer is only to indicate that an interval should be 
respected. In cases of failures, we don't want to follow the interval (so we 
reset timer to 0). Each error will :
   - send a next HB based on other conditions (ex. as soon as the coordinator 
is discovered, when releasing assignment finishes after getting fenced)
   -  not send a next HB at all (fatal errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569498394


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -380,7 +380,7 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case UNRELEASED_INSTANCE_ID:
-logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",

Review Comment:
   both. The change for the default log is needed: it was not including the 
errorMessage, and that makes it hard to know what happened when you get errors 
like INVALID_REQUEST (I personally got it and lost time investigating, so fixed 
it). The other log changes are just improvements because I was already there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unneeded explicit type arguments [kafka]

2024-04-17 Thread via GitHub


mimaison merged PR #15736:
URL: https://github.com/apache/kafka/pull/15736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-17 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1569436277


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -80,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig,
   private val configHelper = new ConfigHelper(metadataCache, config, new 
ZkConfigRepository(adminZkClient))
 
   private val createTopicPolicy =
-Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[CreateTopicPolicy]))
+Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
classOf[CreateTopicPolicy]))
 
   private val alterConfigPolicy =
-Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[AlterConfigPolicy]))
+Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
classOf[AlterConfigPolicy]))

Review Comment:
   fix it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unneeded explicit type arguments [kafka]

2024-04-17 Thread via GitHub


mimaison commented on PR #15736:
URL: https://github.com/apache/kafka/pull/15736#issuecomment-2062104700

   None of the test failures are related, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569413973


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);

Review Comment:
   Ah yes. Thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix typo [kafka]

2024-04-17 Thread via GitHub


birdoplank closed pull request #15743: Fix typo
URL: https://github.com/apache/kafka/pull/15743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569405350


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   Yeah that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15737:
URL: https://github.com/apache/kafka/pull/15737#discussion_r1569396277


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -140,22 +150,32 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
 def __init__(self, node, verify_offsets, idx):
 super().__init__(node, verify_offsets, idx)
 
-def handle_partitions_revoked(self, event):
+def handle_partitions_revoked(self, event, node, logger):
 self.revoked_count += 1
 self.state = ConsumerState.Rebalancing
 self.position = {}
+revoked = []
+
 for topic_partition in event["partitions"]:
-topic = topic_partition["topic"]
-partition = topic_partition["partition"]
-self.assignment.remove(TopicPartition(topic, partition))
+tp = _create_partition_from_dict(topic_partition)
 
-def handle_partitions_assigned(self, event):
+if tp in self.assignment:
+self.assignment.remove(tp)
+revoked.append(tp)
+else:
+logger.warn("Could not remove topic partition %s from 
assignment as it was not previously assigned to %s" % (tp, 
node.account.hostname))

Review Comment:
   do we understand why this situation is happening? Is it related maybe to the 
mismatch assignment failure we've seen elsewhere in the tests? My point is just 
to make sure we're not hiding the real failure with this change. I wouldn't 
expect that the consumer would ever receive a partition to revoke if it was not 
previously assigned right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2062050733

   > @kamalcph : Do you want this PR to be in 3.7 and 3.6? If so, could you 
create a separate cherry-pick RP? Thanks.
   
   Opened #15747 and #15748 to backport it to 3.7 and 3.6 branches. PTAL. 
Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


kamalcph opened a new pull request, #15748:
URL: https://github.com/apache/kafka/pull/15748

   cherry-picked from: d092787487047178c674bd60a182129a8d997c4a
   
   Co-authored-by: hzh0425 <642256...@qq.com>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569356239


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 s"but we only have log segments starting from offset: 
$logStartOffset.")
   }
 
+  private def checkLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   Agree on this. The `checkLocalLogStartOffset` is used only in the 
`convertToOffsetMetadataOrThrow` method which reads from local-disk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the 
leader:
   
KafkaApis.handleFetchRequest
 ReplicaManager.fetchMessages
 ReplicaManager.readFromLog
 Partition.fetchRecords
 # readFromLocalLog
 Partition.updateFollowerFetchState
 Partition.maybeIncrementLeaderHW
  LeaderLog.maybeIncrementHighWatermark
 UnifiedLog.fetchHighWatermarkMetadata
 UnifiedLog.convertToOffsetMetadataOrThrow
 LocalLog.convertToOffsetMetadataOrThrow
 LocalLog.read
 # OffsetOutOfRangeException 
exception
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about 

[PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


kamalcph opened a new pull request, #15747:
URL: https://github.com/apache/kafka/pull/15747

   cherry-picked from: d092787487047178c674bd60a182129a8d997c4a
   
   Co-authored-by: hzh0425 <642256...@qq.com>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1569328393


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For the makeLeaders path, it will call 
UnifiedLog.convertToOffsetMetadataOrThrow. Within it, 
checkLogStartOffset(offset) shouldn't throw OFFSET_OUT_OF_RANGE since we are 
comparing the offset with logStartOffset. Do you know which part throws 
OFFSET_OUT_OF_RANGE error?
   
   The next line `localLog.convertToOffsetMetadataOrThrow` in 
[convertToOffsetMetadataOrThrow](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1429)
 method reads the segment from disk, there it throws the error. The call 
[segments.floorSegment(startOffset)](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/LocalLog.scala?L366)
 in LocalLog fails to find the segment with log-start-offset, then 
OffsetOutOfRangeException is thrown.
   
   > For the follower fetch path, it's bounded by LogEndOffset. So it shouldn't 
need to call UnifiedLog.fetchHighWatermarkMetadata, right? The regular consumer 
will call UnifiedLog.fetchHighWatermarkMetadata.
   
   yes, you're right. I attached the wrong call stack for handling the follower 
request. Please find the updated call stack below:
   
   Leader with invalid high-watermark handles the FETCH requests from follower 
and throws OFFSET_OUT_OF_RANGE error:
   
   ```
KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   # readFromLocalLog
   Partition.updateFollowerFetchState
   Partition.maybeIncrementLeaderHW
LeaderLog.maybeIncrementHighWatermark
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   
LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   # OffsetOutOfRangeException 
exception
   
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1569310965


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -82,6 +82,10 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
 return setBrokerNodes(numBrokerNodes, 1);
 }
 
+public Builder setNumBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {

Review Comment:
   why we need this method? it is equal to `setBrokerNodes`



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -155,29 +161,31 @@ public ClusterConfig copyOf() {
 }
 
 public static Builder defaultClusterBuilder() {
-return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+return new Builder(Type.ZK, 1, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
 }
 
-public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
+public static Builder clusterBuilder(Type type, int brokers, int 
controllers, int disksPerBroker, boolean autoStart,
  SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+return new Builder(type, brokers, controllers, disksPerBroker, 
autoStart, securityProtocol, metadataVersion);
 }
 
 public static class Builder {
 private Type type;
 private int brokers;
 private int controllers;
+private int disksPerBroker;
 private String name;
 private boolean autoStart;
 private SecurityProtocol securityProtocol;
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
 
-Builder(Type type, int brokers, int controllers, boolean autoStart, 
SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
+Builder(Type type, int brokers, int controllers, int disksPerBroker, 
boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion 
metadataVersion) {

Review Comment:
   Could we call setters instead of having a complicated constructor? We should 
not waste the build pattern ... this is unrelated to this PR, and it already 
have a jira (https://issues.apache.org/jira/browse/KAFKA-16560). However, we 
can do a bit refactor here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration

2024-04-17 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16574:
--

 Summary: The metrics of LogCleaner disappear after reconfiguration
 Key: KAFKA-16574
 URL: https://issues.apache.org/jira/browse/KAFKA-16574
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see 
[https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227]

We don't rebuild the metrics after calling shutdown. The following test can 
prove that.
{code:java}
@Test
def testMetricsAfterReconfiguration(): Unit = {
  val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

  def check(): Unit =
LogCleaner.MetricNames.foreach(name => 
assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup
  .metricName(name, java.util.Collections.emptyMap())), s"$name is gone?"))

  try {
check()
logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181")),
  new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")))
check()
  } finally logCleaner.shutdown()
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16522) Admin client changes for adding and removing voters

2024-04-17 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838347#comment-17838347
 ] 

José Armando García Sancio commented on KAFKA-16522:


[~phong260702] thanks for assigning this issue to yourself. I updated the issue 
to show that it is currently blocked by two other issues that need to be 
implemented first.

> Admin client changes for adding and removing voters
> ---
>
> Key: KAFKA-16522
> URL: https://issues.apache.org/jira/browse/KAFKA-16522
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Assignee: Quoc Phong Dang
>Priority: Major
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Admin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-04-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16405.
---
  Reviewer: Lucas Brutschy
Resolution: Fixed

> Mismatch assignment error when running consumer rolling upgrade system tests
> 
>
> Key: KAFKA-16405
> URL: https://issues.apache.org/jira/browse/KAFKA-16405
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> relevant to [https://github.com/apache/kafka/pull/15578]
>  
> We are seeing:
> {code:java}
> 
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.11.4
> session_id:   2024-03-21--001
> run time: 3 minutes 24.632 seconds
> tests run:7
> passed:   5
> flaky:0
> failed:   2
> ignored:  0
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   24.599 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   26.638 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 38, in _verify_range_assignment
> assert assignment == set([
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
> status: PASS
> run time:   29.815 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
> status: PASS
> run time:   29.766 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
> status: PASS
> run time:   30.086 seconds
> 
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   35.965 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=3), 
> TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1), 
> TopicPartition(topic='test_topic', partition=2)})}")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>   

Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-17 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the 
leader:
   
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   Partition.readRecords
   UnifiedLog.read 
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569178614


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   `sourceLog` could be empty now, so maybe we need to revise the log message



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   we can replace this `abortAndPauseCleaning(tp)`



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {
+cleaner.abortAndPauseCleaning(tp)
+  }
+
+  replaceCurrentWithFutureLog(currentLog, futureLog)
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   line#1189 will print `info(s"The current replica is successfully replaced 
with the future replica for $topicPartition")` and that is not much related to 
fact, right? 



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   I guess `cleaner.abortAndPauseCleaning` is added because we call 
`resumeCleaning` later, and it will cause error if we don't call 
`abortAndPauseCleaning` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569269269


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   @CalvinConfluent makes sense, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16003) The znode /config/topics is not updated during KRaft migration in "dual-write" mode

2024-04-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-16003:
-
Fix Version/s: 3.7.1

> The znode /config/topics is not updated during KRaft migration in 
> "dual-write" mode
> ---
>
> Key: KAFKA-16003
> URL: https://issues.apache.org/jira/browse/KAFKA-16003
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.6.1
>Reporter: Paolo Patierno
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> I tried the following scenario ...
> I have a ZooKeeper-based cluster and create a my-topic-1 topic (without 
> specifying any specific configuration for it). The correct znodes are created 
> under /config/topics and /brokers/topics.
> I start a migration to KRaft but not moving forward from "dual write" mode. 
> While in this mode, I create a new my-topic-2 topic (still without any 
> specific config). I see that a new znode is created under /brokers/topics but 
> NOT under /config/topics. It seems that the KRaft controller is not updating 
> this information in ZooKeeper during the dual-write. The controller log shows 
> that the write to ZooKeeper was done, but not everything I would say:
> {code:java}
> 2023-12-13 10:23:26,229 TRACE [KRaftMigrationDriver id=3] Create Topic 
> my-topic-2, ID Macbp8BvQUKpzmq2vG_8dA. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=7, 
> kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=236, controllerZkEpoch=3, 
> controllerZkVersion=3} to ZkMigrationLeadershipState{kraftControllerId=3, 
> kraftControllerEpoch=7, kraftMetadataOffset=445, kraftMetadataEpoch=7, 
> lastUpdatedTimeMs=1702462785587, migrationZkVersion=237, controllerZkEpoch=3, 
> controllerZkVersion=3} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler]
> 2023-12-13 10:23:26,229 DEBUG [KRaftMigrationDriver id=3] Made the following 
> ZK writes when handling KRaft delta: {CreateTopic=1} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-3-migration-driver-event-handler] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-04-17 Thread via GitHub


junrao commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1569221453


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -70,7 +70,7 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED)
 minVersion = 2;
 else if (requireTimestamp)
 minVersion = 1;
-return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
+return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(false), CONSUMER_REPLICA_ID, isolationLevel);

Review Comment:
   Hmm, does that mean we can't test the latest unstable version in the client?



##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -219,7 +219,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 TestUtils.generateAndProduceMessages(servers, topic, 9)
 TestUtils.produceMessage(servers, topic, "test-10", 
System.currentTimeMillis() + 10L)
 
-for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion) {
+for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion(false)) {

Review Comment:
   Do we have tests that enable the latest unstable version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-17 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1569237791


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -80,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig,
   private val configHelper = new ConfigHelper(metadataCache, config, new 
ZkConfigRepository(adminZkClient))
 
   private val createTopicPolicy =
-Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[CreateTopicPolicy]))
+Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
classOf[CreateTopicPolicy]))
 
   private val alterConfigPolicy =
-Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[AlterConfigPolicy]))
+Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
classOf[AlterConfigPolicy]))

Review Comment:
   This one was correct! We need to undo this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569228945


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   Interesting. Good to know. Fuzzy temporal logic 臘‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569232314


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() 
{
+final TopicPartition tp = new TopicPartition("foo", 0);
+testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+// Complete async commit event and sync commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().complete(null);
+
+// Commit async is completed, so commit sync completes immediately 
(since offsets are empty)
+assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+}
+
+@Test
+public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+final TopicPartition tp = new TopicPartition("foo", 0);
+testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+// Complete exceptionally async commit event and sync commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());
+final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+commitEvent.future().completeExceptionally(new KafkaException("Test 
exception"));
+
+// Commit async is completed exceptionally, but this will be handled 
by commit callback - commit sync should not fail.
+assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
+}
+
+private void 
testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);

Review Comment:
   Yes, that's what I meant  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569231774


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
    When I see duplicate (or nearly duplicated) code, my brain turns up its 
sensitivity because I assume there's some devil-in-the-details reason that the 
code wasn't reused.
   
   I'd assumed that it could be ever-so-slightly refactored into using 
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()`, but no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1569226961


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -793,9 +795,9 @@ public void commitAsync(Map offsets, OffsetCo
 }
 
 private CompletableFuture commit(final CommitEvent commitEvent) {
-maybeThrowFencedInstanceException();
-maybeInvokeCommitCallbacks();
 maybeThrowInvalidGroupIdException();
+maybeThrowFencedInstanceException();
+offsetCommitCallbackInvoker.executeCallbacks();

Review Comment:
   Works for me. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


kirktrue commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1569217494


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   `ConsumerRebalanceListenerCallbackNeededEvent` handles 'assign', 'revoke', 
and 'lose' callbacks. It was my understanding—I could be wrong—that we wanted 
to _wait_ to throw the exception after the reconciliation was fully processed. 
That is, not necessarily on the first callback 樂



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   However, this implementation has the interesting property that it will both 
throw the exception _and_ continue processing. It seems like this could 
potentially yield **two** exceptions, if, say, both the `onPartitionsRevoked()` 
_and_ `onPartitionsAssigned()` threw exceptions. Is that the intent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: move topic configuration defaults [kafka]

2024-04-17 Thread via GitHub


cmccabe closed pull request #10589: MINOR: move topic configuration defaults
URL: https://github.com/apache/kafka/pull/10589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569214320


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   Yes, we should only update the partition(remove the ELRs) when min isr 
decreases. Because if the min isr decreases, the partition can advance the HWM 
with fewer ISR members, this can invalidate the ELR as a potential leader.
   Sure, I can add them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569214270


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
+// Forwarding has not happened yet, so handle both ZK and KRaft cases here
 if (remaining.resources().isEmpty) {
   sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-} else if ((!request.isForwarded) && metadataSupport.canForward()) {
+} else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {

Review Comment:
   It's kind of inconsequential that the ZK controller was handling a forwarded 
IncrementalAlterConfigs. All of the IAC handling is done in KafkaApis and never 
even checks if it's the active controller. 
   
   Basically when handling the broker-specific configs, the broker handling the 
forwarded request (i.e., the ZK controller) would see that the broker ID didn't 
match and fail. 
   
   TBH, a better fix would be to always forward to the ZK controller, and 
update the ZK brokers to react to the ZNode change event to process BROKER and 
BROKER_LOGGER changes. But that's a much bigger change :-/ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix some flaky behavior in DeleteConsumerGroupsTest and clean up [kafka]

2024-04-17 Thread via GitHub


cmccabe closed pull request #8115: MINOR: Fix some flaky behavior in 
DeleteConsumerGroupsTest and clean up
URL: https://github.com/apache/kafka/pull/8115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14370: Simplify the ImageWriter API by adding freeze [kafka]

2024-04-17 Thread via GitHub


cmccabe closed pull request #12833: KAFKA-14370: Simplify the ImageWriter API 
by adding freeze
URL: https://github.com/apache/kafka/pull/12833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569210550


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Make sense, I have some misunderstanding about the controller events. Will 
update. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569208039


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}

Review Comment:
   I don't think this method is in the dual-write path. All of the ZK writes 
for configs in dual-write mode should happen in ZkConfigMigrationClient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16570:
---
Description: 
When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 
[https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
 

In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 

  was:
When we want to fence a producer using the admin client, we send an 
InitProducerId request.

There is logic in that API to fence (and abort) any ongoing transactions and 
that is what the API relies on to fence the producer. However, this handling 
also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we 
want to actually get a new producer ID and want to retry until the the ID is 
supplied or we time out.  
[https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
 
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
 

In the case of fence producer, we don't retry and instead we have no handling 
for concurrent transactions and log a message about an unexpected error.
[https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
 

This is not unexpected though and the operation was successful. We should just 
swallow this error and treat this as a successful run of the command. 


> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2024-04-17 Thread via GitHub


ableegoldman commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-2061827689

   No pressure! Just wanted to make sure you weren't still waiting on me for 
this. Or rather, just wanted to say, if you do want to try and get this in to 
3.8 I will make sure to help you. But it's no problem if you don't have time or 
have other things on your plate  
   
   Again, just sorry this one was neglected for so long.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-17 Thread via GitHub


C0urante commented on code in PR #15728:
URL: https://github.com/apache/kafka/pull/15728#discussion_r1569197938


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##
@@ -169,6 +170,7 @@ static Map sourceConsumerConfig(Map props) {
 result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
 result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + 
CONSUMER_CLIENT_PREFIX));
 result.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+result.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

Review Comment:
   While this may be more ideal for many use cases, it's a potentially-breaking 
change that I don't believe we can make without waiting for a major release 
(right now, that'd be 4.0.0), and an accepted KIP. If you're unfamiliar with 
the KIP process, you can read more about it here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Purpose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]

2024-04-17 Thread via GitHub


kirktrue opened a new pull request, #15746:
URL: https://github.com/apache/kafka/pull/15746

   The system test was failing because the `VerifiableConsumer` failed with a 
`NullPointerException` during startup. The reason for the NPE was an attempt to 
put a `null` as the value of `--group-remote-assignor` in the `Consumer`'s 
configuration.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


akhileshchg commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569190991


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode
+throw new ControllerMovedException(s"Cannot set entity configs when 
there is no controller.")
+}
+
+// If there is a KRaft controller defined, don't even attempt this write. 
The broker will soon get a UMR
+// from the new KRaft controller that lets it know about the new 
controller. It will then forward
+// IncrementalAlterConfig requests instead of processing directly.
+if (controllerRegistration.kraftEpoch.exists(epoch => epoch > 0)) {
+  throw new ControllerMovedException(s"Cannot set entity configs directly 
when there is a KRaft controller.")
+}

Review Comment:
   Wouldn't this fail when KRaft controller tries to perform dual write to both 
KRaft log and Zk ?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3003,9 +3008,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
+// Forwarding has not happened yet, so handle both ZK and KRaft cases here
 if (remaining.resources().isEmpty) {
   sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-} else if ((!request.isForwarded) && metadataSupport.canForward()) {
+} else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {

Review Comment:
   I have one question. Why can't the Zk controller take care of the forwarded 
alterConfig requests for other BROKER config entities?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1569160539


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -93,7 +93,7 @@ public List getAdditionalExtensions() {
 TestKitNodes nodes = new TestKitNodes.Builder().
 
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
 setCombined(isCombined).
-setNumBrokerNodes(clusterConfig.numBrokers()).
+setNumBrokerNodes(clusterConfig.numBrokers(), 
clusterConfig.numDisksPerBroker()).

Review Comment:
   ```suggestion
   setBrokerNodes(clusterConfig.numBrokers(), 
clusterConfig.numDisksPerBroker()).
   ```



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -114,6 +118,25 @@ public void testClusterTests() {
 }
 }
 
+@ClusterTests({
+@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, 
disksPerBroker = 1),
+@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, 
disksPerBroker = 2),
+@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, 
disksPerBroker = 2)
+})
+public void testClusterTestWithDisksPerBroker() throws ExecutionException, 
InterruptedException {
+Admin admin = clusterInstance.createAdminClient();
+
+DescribeLogDirsResult result = 
admin.describeLogDirs(clusterInstance.brokerIds());
+result.allDescriptions().get().forEach((brokerId, 
logDirDescriptionMap) -> {
+if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
+Assertions.assertEquals(1, logDirDescriptionMap.size());
+} else if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
+Assertions.assertEquals(2, logDirDescriptionMap.size());
+} else {
+Assertions.fail("Unknown cluster type " + 
clusterInstance.clusterType());
+}
+});
+}

Review Comment:
   Nit: new line after the function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2061751961

   @FrankYang0529 Could you reduce the partition number of offsets topic? It 
seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


junrao commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2061739168

   @kamalcph : Do you want this PR to be in 3.7 and 3.6? If so, could you 
create a separate cherry-pick RP? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


junrao merged PR #15631:
URL: https://github.com/apache/kafka/pull/15631


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-17 Thread via GitHub


junrao commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2061732009

   @kamalcph : Thanks for triaging the failed tests. The PR LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   Addressed in 
[062e932](https://github.com/apache/kafka/pull/15136/commits/062e932f260ce9e1df9571b2fc982c63cbaf0f7c)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-17 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16573:
---

 Summary: Streams does not specify where a Serde is needed
 Key: KAFKA-16573
 URL: https://issues.apache.org/jira/browse/KAFKA-16573
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


Example topology:
{code:java}
 builder
   .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
   .groupBy((key, value) => new KeyValue(value, key))
   .count()
   .toStream()
   .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
 {code}
At runtime, we get the following exception 
{code:java}
Please specify a key serde or set one through 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at 
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
    at 
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the 
issue.

Here a Grouped was missing inside the groupBy, but because the groupBy api 
doesn't force to define Grouped, this one can be missed, and it could be 
difficult to spot on a more complex topology. 

Also, for someone who needs control over serdes in the topology and doesn't 
want to define default serdes.

 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-17 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1569073857


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -79,10 +80,10 @@ class ZkAdminManager(val config: KafkaConfig,
   private val configHelper = new ConfigHelper(metadataCache, config, new 
ZkConfigRepository(adminZkClient))
 
   private val createTopicPolicy =
-
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, 
classOf[CreateTopicPolicy]))
+Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[CreateTopicPolicy]))

Review Comment:
   This should be `CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG`



##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -399,18 +400,18 @@ class LogConfigTest {
 }
   }
 
-  /* Verify that when the deprecated config 
LogMessageTimestampDifferenceMaxMsProp has non default value the new configs
-   * LogMessageTimestampBeforeMaxMsProp and LogMessageTimestampAfterMaxMsProp 
are not changed from the default we are using
+  /* Verify that when the deprecated config 
LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP has non default value the new 
configs

Review Comment:
   `LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_PROP` has been renamed to 
`LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG`
   Same below



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1152,12 +1151,12 @@ class KafkaConfigTest {
 defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
 defaults.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://127.0.0.1:1122")
 defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
"127.0.0.1:2, 127.0.0.2:3")
-defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
-defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12")
-defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11")
-defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10")
-//For LogFlushIntervalMsProp
-defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+defaults.setProperty(KafkaLogConfigs.LOG_DIR_CONFIG, "/tmp1,/tmp2")
+defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, "12")
+defaults.setProperty(KafkaLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, 
"11")
+defaults.setProperty(KafkaLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "10")
+//For LOG_FLUSH_INTERVAL_MS_PROP

Review Comment:
   This has been renamed to `LOG_FLUSH_INTERVAL_MS_CONFIG`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-17 Thread via GitHub


FrankYang0529 opened a new pull request, #15745:
URL: https://github.com/apache/kafka/pull/15745

   We introduced `disksPerBroker` in `TestKitNodes` from 
https://issues.apache.org/jira/browse/KAFKA-16559. We can support to config it 
in `ClusterTest`, so it's more convenient for integration tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569103206


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   I found that the integration test would fail occasionally because the config 
change would happen before an active controller was seen (so zkVersion was 0). 
Adding this exception allows the test to retry to alter config call.
   
   In production, this could only be hit if someone was actively calling alter 
configs as the cluster was being deployed and the call was handled in between 
the time that the first broker came up and when the controller became active. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);

Review Comment:
   I think that we should explicitly remove the previous group before adding 
the new one because we update metrics when the previous group is removed. We 
could likely call `removeGroup` for this purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >