[jira] [Commented] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-25 Thread Linu Shibu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830781#comment-17830781
 ] 

Linu Shibu commented on KAFKA-16356:


Yes [~yondy] , I will update this soon, thanks!

> Remove class-name dispatch in RemoteLogMetadataSerde
> 
>
> Key: KAFKA-16356
> URL: https://issues.apache.org/jira/browse/KAFKA-16356
> Project: Kafka
>  Issue Type: Task
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Assignee: Linu Shibu
>Priority: Trivial
>  Labels: newbie
>
> The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
> has to dispatch to one of four serializers depending on it's type. This is 
> done by taking the class name of the RemoteLogMetadata and looking it up in 
> maps to find the corresponding serializer for that class.
> This later requires an unchecked cast, because the RemoteLogMetadataTransform 
> is generic. This is all type-unsafe, and can be replaced with type-safe 
> if-elseif-else statements that may also be faster than the double-indirect 
> map lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: add docker usage documentation link in README.md [kafka]

2024-03-25 Thread via GitHub


showuon commented on code in PR #15600:
URL: https://github.com/apache/kafka/pull/15600#discussion_r1538541164


##
README.md:
##
@@ -285,6 +285,16 @@ See [tests/README.md](tests/README.md).
 
 See [vagrant/README.md](vagrant/README.md).
 
+### Running in Docker ###
+You could use the official docker image to run Kafka in Docker. It is 
available at [Docker Hub](https://hub.docker.com/r/apache/kafka).
+
+For example, to start a single-node Kafka, you can run the following command: 
+
+ docker compose -f 
docker/examples/jvm/single-node/plaintext/docker-compose.yml up
+
+

Review Comment:
   nit: additional empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16416: Use NetworkClientTest to replace RequestResponseTest to be the example of log4j output [kafka]

2024-03-25 Thread via GitHub


showuon commented on code in PR #15596:
URL: https://github.com/apache/kafka/pull/15596#discussion_r1538537571


##
README.md:
##
@@ -56,7 +56,11 @@ Follow instructions in https://kafka.apache.org/quickstart
 ### Running a particular unit/integration test with log4j output ###
 Change the log4j setting in either 
`clients/src/test/resources/log4j.properties` or 
`core/src/test/resources/log4j.properties`

Review Comment:
   I think there are many other `log4j.properties` needed to be modified if you 
are running tests for that project, not just these 2 files. For example, when 
you are running tests in `TopicCommandTest`, you should update: 
`tools/src/test/resources/log4j.properties`, instead of the files listed here. 
I'm thinking we should rephrase it to a more general way. Maybe:
   
   By default, there will be only small number of logs output while testing. 
You can adjust it by changing the `log4j.properties` in the test project 
directory. 
   For example, if you want to see more logs for clients project tests, you can 
modify the line in `clients/src/test/resources/log4j.properties` to 
`log4j.logger.org.apache.kafka=INFO` and then run: 
   
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-25 Thread via GitHub


showuon commented on PR #15505:
URL: https://github.com/apache/kafka/pull/15505#issuecomment-2019263725

   Re-running CI build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/6/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: add docker usage documentation link in README.md [kafka]

2024-03-25 Thread via GitHub


KevinZTW opened a new pull request, #15600:
URL: https://github.com/apache/kafka/pull/15600

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-25 Thread via GitHub


brandboat commented on code in PR #15588:
URL: https://github.com/apache/kafka/pull/15588#discussion_r1538483814


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";
 
 public static final String RETENTION_MS_CONFIG = "retention.ms";
 public static final String RETENTION_MS_DOC = "This configuration controls 
the maximum time we will retain a " +
 "log before we will discard old log segments to free up space if we 
are using the " +
 "\"delete\" retention policy. This represents an SLA on how soon 
consumers must read " +
-"their data. If set to -1, no time limit is applied.";
+"their data. If set to -1, no time limit is applied. Additionally, 
retention.ms configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment.";

Review Comment:
   Thanks for the feedback ! Do you think this is ok ?
   ```
   Moreover, it triggers the expiration of active segment, 
   segment expiration refers to the complete removal of segments from 
   the partition once the retention.ms condition is satisfied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-25 Thread via GitHub


brandboat commented on code in PR #15588:
URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";

Review Comment:
   The segment expiration means segment is completely removed once the 
retention limits are met.
   Maybe add a sentence like this ?
   ```
   Segment expiration refers to the complete removal of segments 
   from the partition once the retention.bytes condition is satisfied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-25 Thread via GitHub


brandboat commented on code in PR #15588:
URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";

Review Comment:
   The segment expiration means segment is completely removed once the 
retention limits are met.
   Maybe add a sentence like this ?
   ```
   Segment expiration refers to the complete removal of segments 
   from the partition once the retention conditions are satisfied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-25 Thread via GitHub


brandboat commented on code in PR #15588:
URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";

Review Comment:
   The segment expiration means segment expiration are completely removed once 
the retention limits are met.
   Maybe add a sentence like this ?
   ```
   Segment expiration refers to the complete removal of segments 
   from the partition once the retention conditions are satisfied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-25 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1538452997


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   I think we should use use `TestUtils.waitUntilTrue` here because the 
`DelayedRemoteFetchMetrics` is marked in a separate thread, we can't make sure 
it will be triggered immediately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16409) kafka-delete-records / DeleteRecordsCommand should use standard exception handling

2024-03-25 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16409.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> kafka-delete-records / DeleteRecordsCommand should use standard exception 
> handling
> --
>
> Key: KAFKA-16409
> URL: https://issues.apache.org/jira/browse/KAFKA-16409
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: newbie
> Fix For: 3.8.0
>
>
> When an exception is thrown in kafka-delete-records, it propagates through 
> `main` to the JVM, producing the following message:
> {noformat}
> bin/kafka-delete-records.sh --bootstrap-server localhost:9092 
> --offset-json-file /tmp/does-not-exist
> Exception in thread "main" java.io.IOException: Unable to read file 
> /tmp/does-not-exist
>         at 
> org.apache.kafka.common.utils.Utils.readFileAsString(Utils.java:787)
>         at 
> org.apache.kafka.tools.DeleteRecordsCommand.execute(DeleteRecordsCommand.java:105)
>         at 
> org.apache.kafka.tools.DeleteRecordsCommand.main(DeleteRecordsCommand.java:64)
> Caused by: java.nio.file.NoSuchFileException: /tmp/does-not-exist
>         at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>         at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>         at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>         at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>         at java.nio.file.Files.newByteChannel(Files.java:361)
>         at java.nio.file.Files.newByteChannel(Files.java:407)
>         at java.nio.file.Files.readAllBytes(Files.java:3152)
>         at 
> org.apache.kafka.common.utils.Utils.readFileAsString(Utils.java:784)
>         ... 2 more{noformat}
> This is in contrast to the error handling used in other tools, such as the 
> kafka-log-dirs:
> {noformat}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe 
> --command-config /tmp/does-not-exist
> /tmp/does-not-exist
> java.nio.file.NoSuchFileException: /tmp/does-not-exist
>         at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>         at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>         at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>         at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>         at java.nio.file.Files.newByteChannel(Files.java:361)
>         at java.nio.file.Files.newByteChannel(Files.java:407)
>         at 
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>         at java.nio.file.Files.newInputStream(Files.java:152)
>         at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:686)
>         at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:673)
>         at 
> org.apache.kafka.tools.LogDirsCommand.createAdminClient(LogDirsCommand.java:149)
>         at 
> org.apache.kafka.tools.LogDirsCommand.execute(LogDirsCommand.java:68)
>         at 
> org.apache.kafka.tools.LogDirsCommand.mainNoExit(LogDirsCommand.java:54)
>         at 
> org.apache.kafka.tools.LogDirsCommand.main(LogDirsCommand.java:49){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]

2024-03-25 Thread via GitHub


showuon merged PR #15586:
URL: https://github.com/apache/kafka/pull/15586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]

2024-03-25 Thread via GitHub


showuon commented on PR #15586:
URL: https://github.com/apache/kafka/pull/15586#issuecomment-2019187474

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2024-03-25 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830734#comment-17830734
 ] 

Henry Cai commented on KAFKA-15265:
---

How is the progress of this tiered storage quota work?  We are also facing the 
performance problem (large network outburst / cpu saturation) during the 
initial topic conversion to be remote.storage.enabled

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft

2024-03-25 Thread Zhiyuan Lei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830733#comment-17830733
 ] 

Zhiyuan Lei commented on KAFKA-16240:
-

 
```
TimeUnit.MILLISECONDS.sleep(1)
val result1 = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(117L)).asJava)
```
 
I found something interesting. This issue is indeed easily reproducible 
locally. After some investigation, it seems to be caused by the time difference 
of the low watermark update in the replica. As long as we add a wait time here, 
this test case can be guaranteed to pass 100%.

 

Waiting patiently is not a good solution, Further work is still required.

> Flaky test 
> PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
> -
>
> Key: KAFKA-16240
> URL: https://issues.apache.org/jira/browse/KAFKA-16240
> Project: Kafka
>  Issue Type: Test
>Reporter: Gantigmaa Selenge
>Priority: Minor
>
> Failed run 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]
> Stack trace
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>  at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16272: Update connect_distributed_test.py to support KIP-848’s group protocol config [kafka]

2024-03-25 Thread via GitHub


philipnee commented on PR #15576:
URL: https://github.com/apache/kafka/pull/15576#issuecomment-2019090272

   @kirktrue @lucasbru - mind taking a second look? For this specific PR I only 
modified the tests using console consumer that's why several tests cases were 
omitted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1538356233


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   are you sure this is what group protocol config called in connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-03-25 Thread via GitHub


jolshan merged PR #15559:
URL: https://github.com/apache/kafka/pull/15559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-03-25 Thread via GitHub


jolshan commented on PR #15559:
URL: https://github.com/apache/kafka/pull/15559#issuecomment-2019067710

   
[testAlterSinkConnectorOffsetsOverriddenConsumerGroupId](https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId)
   
   
[testSeparateOffsetsTopic](https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest=testSeparateOffsetsTopic)
   
   These caught my eye but have been flaky/failing since before this change. 
Here are JIRAs for them: https://issues.apache.org/jira/browse/KAFKA-15914 and 
https://issues.apache.org/jira/browse/KAFKA-14089
   
   The other tests look known and/or unrelated. I will go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2024-03-25 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830728#comment-17830728
 ] 

Justine Olshan commented on KAFKA-14089:


I've seen this again as well. 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest=testSeparateOffsetsTopic

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.3.0
>
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15914) Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - OffsetsApiIntegrationTest

2024-03-25 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830727#comment-17830727
 ] 

Justine Olshan commented on KAFKA-15914:


I'm seeing this as well. 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId

> Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - 
> OffsetsApiIntegrationTest
> ---
>
> Key: KAFKA-15914
> URL: https://issues.apache.org/jira/browse/KAFKA-15914
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> Test intermittently gives the following result:
> {code}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:396)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId(OffsetsApiIntegrationTest.java:297)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-03-25 Thread via GitHub


kirktrue opened a new pull request, #15599:
URL: https://github.com/apache/kafka/pull/15599

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16084: Simplify and deduplicate standalone herder test mocking [kafka]

2024-03-25 Thread via GitHub


gharris1727 commented on code in PR #15389:
URL: https://github.com/apache/kafka/pull/15389#discussion_r1538282799


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -115,64 +115,61 @@ public class StandaloneHerderTest {
 private static final String TOPICS_LIST_STR = "topic1,topic2";
 private static final String WORKER_ID = "localhost:8083";
 private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+private static final Long WAIT_TIME = 1000L;

Review Comment:
   nit: 1 second is probably good enough, but it costs nothing to increase this 
now.
   Also the constant should probably include the unit, so `WAIT_TIME_MS`.
   ```suggestion
   private static final Long WAIT_TIME = 15000L;
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -227,59 +222,48 @@ public void testCreateConnectorAlreadyExists() throws 
Throwable {
 
 @Test
 public void testCreateSinkConnector() throws Exception {
-connector = mock(BogusSinkConnector.class);
 expectAdd(SourceSink.SINK);
 
 Map config = connectorConfig(SourceSink.SINK);
-Connector connectorMock = mock(SinkConnector.class);
-expectConfigValidation(connectorMock, true, config);
+expectConfigValidation(SourceSink.SINK, config);
 
 herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
-Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+Herder.Created connectorInfo = 
createCallback.get(WAIT_TIME, TimeUnit.MILLISECONDS);
 assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 }
 
 @Test
 public void testCreateConnectorWithStoppedInitialState() throws Exception {
-connector = mock(BogusSinkConnector.class);
 Map config = connectorConfig(SourceSink.SINK);
 Connector connectorMock = mock(SinkConnector.class);

Review Comment:
   nit: unused



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1168,25 +1090,27 @@ private static Map 
taskConfig(SourceSink sourceSink) {
 }
 
 private void expectConfigValidation(
-Connector connectorMock,
-boolean shouldCreateConnector,
+SourceSink sourceSink,
 Map... configs
 ) {
 // config validation
+Connector connectorMock = sourceSink == SourceSink.SOURCE ? 
mock(SourceConnector.class) : mock(SinkConnector.class);
 when(worker.configTransformer()).thenReturn(transformer);
 final ArgumentCaptor> configCapture = 
ArgumentCaptor.forClass(Map.class);
 
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
 when(worker.getPlugins()).thenReturn(plugins);
 when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
 when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
-if (shouldCreateConnector) {
-when(worker.getPlugins()).thenReturn(plugins);
-when(plugins.newConnector(anyString())).thenReturn(connectorMock);
-}
+
+// Assume the connector should always be created
+when(worker.getPlugins()).thenReturn(plugins);
+when(plugins.newConnector(anyString())).thenReturn(connectorMock);
 when(connectorMock.config()).thenReturn(new ConfigDef());
 
-for (Map config : configs)
+// Set up validation for each config
+for (Map config : configs) {
 when(connectorMock.validate(config)).thenReturn(new 
Config(Collections.emptyList()));
+}
 }
 
 // We need to use a real class here due to some issue with mocking 
java.lang.Class

Review Comment:
   nit: My IDE is also warning me that these should be `static`, which is fine 
for how these are used. Could you also fix that?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -726,22 +668,21 @@ public void testAccessors() throws Exception {
 
doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
-// Create connector
-connector = mock(BogusSourceConnector.class);
+
 expectAdd(SourceSink.SOURCE);
-expectConfigValidation(connector, true, connConfig);
+expectConfigValidation(SourceSink.SOURCE, connConfig);
 
 // Validate accessors with 1 connector
 doNothing().when(listConnectorsCb).onCompletion(null, 
singleton(CONNECTOR_NAME));
 ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, 
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),

Review Comment:
   

Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2018886221

   Hey @lucasbru - Thanks for taking the time to review this PR.  Let me know 
if there's anything to add to the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1538195311


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -423,11 +432,11 @@ private CompletableFuture 
sendListOffsetsRequestsAndResetPositions(
 });
 });
 
-if (unsentRequests.size() > 0) {
+if (unsentRequests.isEmpty()) {

Review Comment:
   @lucasbru - Switched the order as !__.isEmpty is rather difficult to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2018835239

   Hey @Phuc-Hong-Tran , thanks a lot for the PR! I had a first pass and left 
some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-25 Thread via GitHub


soarez commented on code in PR #15588:
URL: https://github.com/apache/kafka/pull/15588#discussion_r1538177857


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";
 
 public static final String RETENTION_MS_CONFIG = "retention.ms";
 public static final String RETENTION_MS_DOC = "This configuration controls 
the maximum time we will retain a " +
 "log before we will discard old log segments to free up space if we 
are using the " +
 "\"delete\" retention policy. This represents an SLA on how soon 
consumers must read " +
-"their data. If set to -1, no time limit is applied.";
+"their data. If set to -1, no time limit is applied. Additionally, 
retention.ms configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment.";

Review Comment:
   This sentence seems incomplete.



##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -67,13 +67,17 @@ public class TopicConfig {
 "(which consists of log segments) can grow to before we will discard 
old log segments to free up space if we " +
 "are using the \"delete\" retention policy. By default there is no 
size limit only a time limit. " +
 "Since this limit is enforced at the partition level, multiply it by 
the number of partitions to compute " +
-"the topic retention in bytes.";
+"the topic retention in bytes. Additionally, retention.bytes 
configuration " +
+"operates independently of \"segment.ms\" and \"segment.byte\" 
configurations. " +
+"Moreover, it triggers the expiration of active segment if 
retention.bytes is configured to zero.";

Review Comment:
   Should we explain what "segment expiration" entails? I don't think it's 
clear from the description what it would mean in practical terms for the active 
segment to expire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538180339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
 public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 maybeThrowFencedInstanceException();
 maybeInvokeCommitCallbacks();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}

Review Comment:
   Just for consistency, what about we encapsulate this in something like 
`maybeUpdateSubscriptionMetadata`? It would align nicely with the above funcs 
(and also that's how the similar functionality is named in the legacy 
coordinator so would be helpful to understand how that piece of logic 
translates into the new consumer) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538173798


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 } else {
 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
 //  - not supported yet
+TreeSet subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+sentFields.subscribedTopicNames = subscribedTopicNames;
+}

Review Comment:
   With this addition we end up with the exact same code repeated for the `if` 
and `else`, so I would say we should find a better way of doing this. First 
solution that comes to mind is to remove the if/else. In the end, we have a 
single case to handle here: send explicit subscriptions (topic names) to the 
broker (from the HB Mgr POV and to the broker, it does not make a diff if the 
topic list came from a call to subscribe with topics or a call to subscribe 
with Pattern that we internally resolved on the client) 
   
   When we take on the next task of supporting the new regex, we'll actually 
have to send something different here, so we can decide then how to best 
differentiate the 2 cases. For now, including this PR, we only support 1 case 
regarding the content of what we send in the HB regarding subscription. Makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker

2024-03-25 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830685#comment-17830685
 ] 

Greg Harris edited comment on KAFKA-16344 at 3/25/24 8:06 PM:
--

Hi [~janardhanag] Yes, you should consider increasing offset.lag.max. 0 is best 
for precise offset translation, but can significantly increase the amount of 
traffic on the offset-syncs topic.

For topics without offset gaps (e.g. not compacted, not transactional) the 
offset.lag.max currently behaves as a ratio between the amount of records/sec 
in a mirrored topic, and that topic's additional load on the offset-syncs 
topic. There's a semaphore 
[https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L95]
 that prevents this load from slowing down the mirror source connector, but it 
won't protect the brokers holding the mm2 offsets topic from being overloaded.

For example, if you were to mirror 100 records, offset.lag.max=0 could 
write 100 records to the offset-syncs topic, generating huge load. For 
100 records with offset.lag.max=100, the number of offset syncs could be 
1, 100x less. For offset.lag.max=1, the number of offset syncs could be 
100, 1x less. To answer your questions:

> 1. What is the proportion of mm2-offsetsyncsinternal topic writes to overall 
>MM2 traffic?

Configurable with offset.lag.max for topics with contiguous offsets. For 
transactional topics it isn't configurable, as each transaction may cause 1 or 
2 offset syncs and that isn't limited by the offset.lag.max, so reducing the 
throughput may require increasing the transaction intervals on your source 
applications.

> 2. Is there a way to tune the internal topic writes with increased traffic 
> MM2?

As the throughput of your MM2 instance increases, you should increase the 
offset.lag.max to keep the offset-syncs topic fixed.

> 3. I want to understand the expected mm2-offsetsyncsinternal write TPS for a 
> given amount of MM2 traffic. Are there any tunable parameters to reduce these 
> writes, and what are the consequences of tuning if any?

There isn't a closed formula, because a sync may be initiated by multiple 
conditions 
[https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L360-L363]
 and prevented by the semaphore when the producer latency becomes significant.

The consequence of decreasing throughput on this topic is less precise offset 
translation in the MirrorCheckpointConnector, and increased redelivery when 
failing over from source to target consumer offsets. If you aren't using this 
connector, then the offset syncs topic isn't read from at all. You currently 
can't turn the topic off, but there is a KIP open for that: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector]
 . If for example you were satisfied with 30 seconds of redelivery for a topic 
which had 1000 records/second, then you could set offset.lag.max to 30*1000.

> 4. In a larger Kafka cluster, if a single broker is overloaded with 
> mm2-offsetsyncsinternal traffic, can it lead to a broker crash? Are there any 
> guidelines available for such scenarios? Currently, we have 6 brokers with 
> 24K MM2 traffic, and internal writes are at 10K, resulting in a 20% CPU 
> increase on one broker.

I'm not familiar with the failure conditions for overloaded brokers. If you are 
seeing failures then I would definitely recommend trying to tune MM2, or try to 
use quotas [https://kafka.apache.org/documentation/#design_quotas] to get this 
topic-partition under control.

> 5. Are there any limitations on Kafka brokers scaling , I mean how much kafka 
>broker can be expanded in single Kakfa cluster due to the 
>mm2-offsetsyncsinternal topic?

Because the offset syncs topic only supports a single partition, adding brokers 
will not solve this problem, other than having a broker dedicated to only 
serving this one topic. At some point, you would need two separate offset-syncs 
topics and separate MirrorSourceConnectors to shard the load.

> 6. How can the system be dimensioned to handle MM2 internal topic writes 
> effectively? Are there any recommended figures available? For instance, for a 
> given amount of traffic (X), what percentage increase in CPU (Y) should each 
> broker have to handle MM2 internal topic writes? Note that in other pods, 
> this resource may not be utilized.

The Kafka project doesn't publish "expected" dimensions, other than the default 
values of configurations. We don't publish performance analysis on any 
particular hardware setups, because the diversity of hardware running Kafka is 
just too vast to capture properly.


was (Author: gharris1727):
Hi 

[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker

2024-03-25 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830685#comment-17830685
 ] 

Greg Harris commented on KAFKA-16344:
-

Hi [~janardhanag] Yes, you should consider increasing offset.lag.max. 0 is best 
for precise offset translation, but can significantly increase the amount of 
traffic on the offset-syncs topic.

For topics without offset gaps (e.g. not compacted, not transactional) the 
offset.lag.max currently behaves as a ratio between the amount of records/sec 
in a mirrored topic, and that topic's additional load on the offset-syncs 
topic. There's a semaphore 
https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L95
 that prevents this load from slowing down the mirror source connector, but it 
won't protect the brokers holding the mm2 offsets topic from being overloaded.

For example, if you were to mirror 100 records, offset.lag.max=0 could 
write 100 records to the offset-syncs topic, generating huge load. For 
100 records with offset.lag.max=100, the number of offset syncs could be 
1, 100x less. For offset.lag.max=1, the number of offset syncs could be 
100, 1x less. To answer your questions:

> 1. What is the proportion of mm2-offsetsyncsinternal topic writes to overall 
>MM2 traffic?

Configurable with offset.lag.max for topics with contiguous offsets. For 
transactional topics it isn't configurable, as each transaction may cause 1 or 
2 offset syncs and that isn't limited by the offset.lag.max, so reducing the 
throughput may require increasing the transaction intervals on your source 
applications.

> 2. Is there a way to tune the internal topic writes with increased traffic 
> MM2?

As the throughput of your MM2 instance increases, you should decrease the 
offset.lag.max to keep the offset-syncs topic fixed.

> 3. I want to understand the expected mm2-offsetsyncsinternal write TPS for a 
> given amount of MM2 traffic. Are there any tunable parameters to reduce these 
> writes, and what are the consequences of tuning if any?

There isn't a closed formula, because a sync may be initiated by multiple 
conditions 
[https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L360-L363]
 and prevented by the semaphore when the producer latency becomes significant.

The consequence of decreasing throughput on this topic is less precise offset 
translation in the MirrorCheckpointConnector, and increased redelivery when 
failing over from source to target consumer offsets. If you aren't using this 
connector, then the offset syncs topic isn't read from at all. You currently 
can't turn the topic off, but there is a KIP open for that: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector]
 . If for example you were satisfied with 30 seconds of redelivery for a topic 
which had 1000 records/second, then you could set offset.lag.max to 30*1000.

> 4. In a larger Kafka cluster, if a single broker is overloaded with 
> mm2-offsetsyncsinternal traffic, can it lead to a broker crash? Are there any 
> guidelines available for such scenarios? Currently, we have 6 brokers with 
> 24K MM2 traffic, and internal writes are at 10K, resulting in a 20% CPU 
> increase on one broker.

I'm not familiar with the failure conditions for overloaded brokers. If you are 
seeing failures then I would definitely recommend trying to tune MM2, or try to 
use quotas [https://kafka.apache.org/documentation/#design_quotas] to get this 
topic-partition under control.

> 5. Are there any limitations on Kafka brokers scaling , I mean how much kafka 
>broker can be expanded in single Kakfa cluster due to the 
>mm2-offsetsyncsinternal topic?

Because the offset syncs topic only supports a single partition, adding brokers 
will not solve this problem, other than having a broker dedicated to only 
serving this one topic. At some point, you would need two separate offset-syncs 
topics and separate MirrorSourceConnectors to shard the load.

> 6. How can the system be dimensioned to handle MM2 internal topic writes 
> effectively? Are there any recommended figures available? For instance, for a 
> given amount of traffic (X), what percentage increase in CPU (Y) should each 
> broker have to handle MM2 internal topic writes? Note that in other pods, 
> this resource may not be utilized.

The Kafka project doesn't publish "expected" dimensions, other than the default 
values of configurations. We don't publish performance analysis on any 
particular hardware setups, because the diversity of hardware running Kafka is 
just too vast to capture properly.

> Internal topic mm2-offset-syncsinternal created with single 
> partition is putting 

[jira] [Closed] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-03-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-16224.
-

> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (meaning that we would effectively 
> consider UnknownTopicOrPartitionException as non-retriable, even though it 
> extends RetriableException, only when committing offsets before revocation as 
> part of this task) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has, tracked with a related issue given that it 
> would require a separate fix for the legacy consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]

2024-03-25 Thread via GitHub


cadonna merged PR #15581:
URL: https://github.com/apache/kafka/pull/15581


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161659


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -433,7 +433,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   ditto



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -486,7 +486,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]

2024-03-25 Thread via GitHub


cadonna commented on PR #15581:
URL: https://github.com/apache/kafka/pull/15581#issuecomment-2018802319

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161334


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -374,7 +374,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
*/
   // TODO: enable this test for the consumer group protocol when support for 
pattern subscriptions is implemented.

Review Comment:
   we should remove this TODO now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-03-25 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1538158322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466))
 . What do you think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16420) Add thread-safe alternative to utils.Exit

2024-03-25 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16420:

Summary: Add thread-safe alternative to utils.Exit  (was: Replace 
utils.Exit with a thread-safe alternative)

> Add thread-safe alternative to utils.Exit
> -
>
> Key: KAFKA-16420
> URL: https://issues.apache.org/jira/browse/KAFKA-16420
> Project: Kafka
>  Issue Type: Wish
>  Components: connect, core, tools
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> The Exit class is not thread-safe, and exposes our tests to race conditions 
> and inconsistent execution. It is not possible to make it thread-safe due to 
> the static design of the API.
> We should add an alternative to the Exit class, and migrate the existing 
> usages to the replacement, before finally removing the legacy Exit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16294) Add group protocol migration enabling config

2024-03-25 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu updated KAFKA-16294:

Description: 
The online upgrade is triggered when a consumer group heartbeat request is 
received in a classic group. The downgrade is triggered when any old protocol 
request is received in a consumer group. We only accept upgrade/downgrade if 
the corresponding group migration config policy is enabled.

This is the first part of the implementation of online group protocol 
migration, adding the kafka config group protocol migration. The config has 
four valid values – both(both upgrade and downgrade are allowed), upgrade(only 
upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is 
allowed.).

At present the default value is NONE. When we start enabling the migration, we 
expect to set BOTH to default so that it's easier to roll back to the old 
protocol as a quick fix for anything wrong in the new protocol; when using 
consumer groups becomes default and the migration is near finished, we will set 
the default policy to UPGRADE to prevent unwanted downgrade causing too 
frequent migration. DOWNGRADE could be useful for revert or debug purposes.

  was:
The online upgrade is triggered when a consumer group heartbeat request is 
received in a classic group. The downgrade is triggered when any old protocol 
request is received in a consumer group. We only accept upgrade/downgrade if 
the corresponding group migration config policy is enabled.

This is the first part of the implementation of online group protocol 
migration, adding the kafka config group protocol migration. The config has 
four valid values – both(both upgrade and downgrade are allowed), upgrade(only 
upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is 
allowed.).

At present the default value is NONE. When we start enabling the migration, we 
expect to set BOTH to default so that it's easier to roll back to the old 
protocol as a quick fix for anything wrong in the new protocol; when using 
consumer groups becomes default and the migration is near finished, we will set 
the default policy to upgrade to prevent unwanted downgrade causing too 
frequent migration.


> Add group protocol migration enabling config
> 
>
> Key: KAFKA-16294
> URL: https://issues.apache.org/jira/browse/KAFKA-16294
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>
> The online upgrade is triggered when a consumer group heartbeat request is 
> received in a classic group. The downgrade is triggered when any old protocol 
> request is received in a consumer group. We only accept upgrade/downgrade if 
> the corresponding group migration config policy is enabled.
> This is the first part of the implementation of online group protocol 
> migration, adding the kafka config group protocol migration. The config has 
> four valid values – both(both upgrade and downgrade are allowed), 
> upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and 
> none(neither is allowed.).
> At present the default value is NONE. When we start enabling the migration, 
> we expect to set BOTH to default so that it's easier to roll back to the old 
> protocol as a quick fix for anything wrong in the new protocol; when using 
> consumer groups becomes default and the migration is near finished, we will 
> set the default policy to UPGRADE to prevent unwanted downgrade causing too 
> frequent migration. DOWNGRADE could be useful for revert or debug purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on PR #15584:
URL: https://github.com/apache/kafka/pull/15584#issuecomment-2018746846

   > I don't think it's a good idea to introduce the new terms mangling & 
unmangling when there are already equivalent terms in the codebase – 
sanitizing/desanitizing – it makes it unnecessarily confusing.
   
   That's a fair point. I will remove the references to "mangling" and replace 
them with "sanitization"
   
   (although I don't really agree, I think "sanitization" implies discarding 
bad data, not mangling it)
   
   But let's not change ZK terminology at this point :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16294) Add group protocol migration enabling config

2024-03-25 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu updated KAFKA-16294:

Description: 
The online upgrade is triggered when a consumer group heartbeat request is 
received in a classic group. The downgrade is triggered when any old protocol 
request is received in a consumer group. We only accept upgrade/downgrade if 
the corresponding group migration config policy is enabled.

This is the first part of the implementation of online group protocol 
migration, adding the kafka config group protocol migration. The config has 
four valid values – both(both upgrade and downgrade are allowed), upgrade(only 
upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is 
allowed.).

At present the default value is NONE. When we start enabling the migration, we 
expect to set BOTH to default so that it's easier to roll back to the old 
protocol as a quick fix for anything wrong in the new protocol; when using 
consumer groups becomes default and the migration is near finished, we will set 
the default policy to upgrade to prevent unwanted downgrade causing too 
frequent migration.

  was:
The offline upgrade is triggered when a consumer group heartbeat request is 
received in a classic group. The downgrade is triggered when any old protocol 
request is received in a consumer group. We only accept upgrade/downgrade if 1) 
the group migration config is enabled 2) the group is empty.

This is the first part of the implementation of offline group protocol 
migration, adding the kafka config group protocol migration. The config has 
four valid values – both(both upgrade and downgrade are allowed), upgrade(only 
upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is 
allowed.).


> Add group protocol migration enabling config
> 
>
> Key: KAFKA-16294
> URL: https://issues.apache.org/jira/browse/KAFKA-16294
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>
> The online upgrade is triggered when a consumer group heartbeat request is 
> received in a classic group. The downgrade is triggered when any old protocol 
> request is received in a consumer group. We only accept upgrade/downgrade if 
> the corresponding group migration config policy is enabled.
> This is the first part of the implementation of online group protocol 
> migration, adding the kafka config group protocol migration. The config has 
> four valid values – both(both upgrade and downgrade are allowed), 
> upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and 
> none(neither is allowed.).
> At present the default value is NONE. When we start enabling the migration, 
> we expect to set BOTH to default so that it's easier to roll back to the old 
> protocol as a quick fix for anything wrong in the new protocol; when using 
> consumer groups becomes default and the migration is near finished, we will 
> set the default policy to upgrade to prevent unwanted downgrade causing too 
> frequent migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16084: Simplify and deduplicate standalone herder test mocking [kafka]

2024-03-25 Thread via GitHub


ahmedsobeh commented on code in PR #15389:
URL: https://github.com/apache/kafka/pull/15389#discussion_r1538122992


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -726,21 +683,21 @@ public void testAccessors() throws Exception {
 
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), 
isNull());
 
 // Create connector
-connector = mock(BogusSourceConnector.class);
+Connector connector = mock(BogusSourceConnector.class);

Review Comment:
   Thanks for the walkthrough! I now understand the full picture. I think I 
addressed all your comments now, let me know if I missed anything



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on code in PR #15584:
URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-
   /**
-   * In ZK, we use the special string "default" to represent the 
default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that 
converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "default" to represent the 
default config entity.
+   * In KRaft, we use an empty string. This method converts the between the 
two conventions.
*/
-  private def fromZkEntityName(entityName: String): String = {
-if (entityName.equals(ConfigEntityName.DEFAULT)) {
+  private def fromZkConfigfEntityName(entityName: String): String = {
+if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
   ""
 } else {
   entityName
 }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
 if (entityName.isEmpty) {
-  ConfigEntityName.DEFAULT
+  ZooKeeperInternals.DEFAULT_STRING
 } else {
   entityName
 }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): 
EntityData = {
-new 
EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+entityType: String,
+znodeName: String
+  ): EntityData = {
+val result = new EntityData().setEntityType(entityType)
+if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
+  // Default __client quota__ entity names are null. This is different 
than default __configs__,
+  // which have their names set to the empty string instead.
+  result.setEntityName(null)
+} else {
+  // ZNode names are mangled before being stored in ZooKeeper.
+  // For example, @ is turned into %40. Undo the mangling here.
+  result.setEntityName(Sanitizer.desanitize(znodeName))
+}
+result
   }
 
-
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
 def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
-  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, 
props) =>
-val entity = List(buildEntityData(entityType, name)).asJava
+  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case 
(znodeName, props) =>
+val entity = List(buildClientQuotaEntityData(entityType, 
znodeName)).asJava
 
 ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
   val propertyValue = props.getProperty(mechanism.mechanismName)
   if (propertyValue != null) {
 val scramCredentials = 
ScramCredentialUtils.credentialFromString(propertyValue)
 logAndRethrow(this, s"Error in client quota visitor for SCRAM 
credential. User was $entity.") {
-  visitor.visitScramCredential(name, mechanism, scramCredentials)
+  visitor.visitScramCredential(Sanitizer.desanitize(znodeName), 
mechanism, scramCredentials)

Review Comment:
   That is correct. The previous change to `ZkMigrationClient` from KAFKA-16222 
has been removed in this PR, as you can see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on code in PR #15584:
URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-
   /**
-   * In ZK, we use the special string "default" to represent the 
default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that 
converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "default" to represent the 
default config entity.
+   * In KRaft, we use an empty string. This method converts the between the 
two conventions.
*/
-  private def fromZkEntityName(entityName: String): String = {
-if (entityName.equals(ConfigEntityName.DEFAULT)) {
+  private def fromZkConfigfEntityName(entityName: String): String = {
+if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
   ""
 } else {
   entityName
 }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
 if (entityName.isEmpty) {
-  ConfigEntityName.DEFAULT
+  ZooKeeperInternals.DEFAULT_STRING
 } else {
   entityName
 }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): 
EntityData = {
-new 
EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+entityType: String,
+znodeName: String
+  ): EntityData = {
+val result = new EntityData().setEntityType(entityType)
+if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
+  // Default __client quota__ entity names are null. This is different 
than default __configs__,
+  // which have their names set to the empty string instead.
+  result.setEntityName(null)
+} else {
+  // ZNode names are mangled before being stored in ZooKeeper.
+  // For example, @ is turned into %40. Undo the mangling here.
+  result.setEntityName(Sanitizer.desanitize(znodeName))
+}
+result
   }
 
-
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
 def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
-  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, 
props) =>
-val entity = List(buildEntityData(entityType, name)).asJava
+  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case 
(znodeName, props) =>
+val entity = List(buildClientQuotaEntityData(entityType, 
znodeName)).asJava
 
 ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
   val propertyValue = props.getProperty(mechanism.mechanismName)
   if (propertyValue != null) {
 val scramCredentials = 
ScramCredentialUtils.credentialFromString(propertyValue)
 logAndRethrow(this, s"Error in client quota visitor for SCRAM 
credential. User was $entity.") {
-  visitor.visitScramCredential(name, mechanism, scramCredentials)
+  visitor.visitScramCredential(Sanitizer.desanitize(znodeName), 
mechanism, scramCredentials)

Review Comment:
   That is correct. The previous change from KAFKA-16222 has been removed in 
this PR, as you can see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]

2024-03-25 Thread via GitHub


jeffkbkim commented on PR #15446:
URL: https://github.com/apache/kafka/pull/15446#issuecomment-2018694290

   thanks @dajac. I have addressed your comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16200) Enforce that RequestManager implementations respect user-provided timeout

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16200:
-

Assignee: Kirk True  (was: Bruno Cadonna)

> Enforce that RequestManager implementations respect user-provided timeout
> -
>
> Key: KAFKA-16200
> URL: https://issues.apache.org/jira/browse/KAFKA-16200
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.
> Enforce at the request manager layer that timeouts are respected per the 
> design in KAFKA-15848.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16420: Add instance methods to Exit to replace static methods [kafka]

2024-03-25 Thread via GitHub


gharris1727 opened a new pull request, #15598:
URL: https://github.com/apache/kafka/pull/15598

   This adds a thread-safe alternative to the current Exit implementation, 
which doesn't make use of static mutable fields. This solves a number of 
problems:
   
   1. Thread isolation: Two tests can inject exit procedures concurrently, 
without conflicting with one-another
   2. The test-only methods for manipulating the state of Exit are present on 
the main class, and so could be called in a non-test environment.
   3. When fatal procedures are intercepted by Exit, shutdown hooks are delayed 
until the real JVM exit, possibly leaking resources/files across tests.
   4. When fatal procedures are intercepted by Exit, those exceptions can go 
un-noticed, and may not cause the test to fail.
   5. When shutdown hooks are intercepted by Exit, they are dropped and never 
executed
   6. It isn't clear to code calling exit() that an exception might be thrown 
instead when the procedures are mocked. 
   
   The Java and Scala Exit classes don't use any instance methods, so I was 
able to directly add instance methods to the existing classes. This should 
prevent the need for a large rename later, such as appears necessary with the 
other options I could think of:
   
   * Add it as a different class SafeExit: large amount of code churn when Exit 
is removed, silly name
   * Moving "Exit" to "UnsafeExit" now: large amount of code churn up-front, 
immediate merge conflicts
   * Add it to a different package as "utils.temp.Exit": moderate amount of 
code churn when Exit is removed, harder to review
   
   Because static and instance methods cannot share signatures, I had to 
perturb the names slightly. I think the new names are acceptable, and shouldn't 
cause any additional code churn.
   * exit to exitOrThrow
   * halt to haltOrThrow
   * addShutdownHook to addShutdownRunnable
   
   In the interest of not testing tests, I've omitted tests for these changes. 
The existing ExitTest is testing the mocking methods, which are not part of the 
main API now, just the MockExit API.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-25 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1538043755


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +252,91 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+try {
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
+
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
+record.timestamp(),
+record.headers()));
+restoreCount++;
+}
+} catch (final Exception deserializationException) {
+handleDeserializationFailure(

Review Comment:
   If you still think we need to refactor RecordDeserializer I think we should 
do that in a follow up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16160:
--
Priority: Blocker  (was: Major)

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Priority: Critical  (was: Blocker)

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15950) Serialize broker heartbeat requests

2024-03-25 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15950.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

merged the PR to trunk.

> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.8.0
>
>
> This is a followup issue from the discussion in 
> [https://github.com/apache/kafka/pull/14836#discussion_r1409739363].
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao merged PR #14903:
URL: https://github.com/apache/kafka/pull/14903


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Priority: Blocker  (was: Major)

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018530584

   @soarez : Thanks for triaging the test failures. Will merge the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16271:
-

Assignee: Philip Nee

> Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol 
> config
> --
>
> Key: KAFKA-16271
> URL: https://issues.apache.org/jira/browse/KAFKA-16271
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in 
> {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.
> The tricky wrinkle here is that the existing test relies on client-side 
> assignment strategies that aren't applicable with the new KIP-848-enabled 
> consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018506864

   @junrao all the failed tests are being tracked.
   
   These were already tracked:
   
   * KAFKA-8041 
kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft
   * KAFKA-8115 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   * KAFKA-15772 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testAbortTransactionTimeout(String).quorum=kraft
   * KAFKA-15897 
kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId()
   * KAFKA-15898 
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers()
   * KAFKA-15921 
kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=kraft
   * KAFKA-15927 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()
   * KAFKA-15927 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault()
   * KAFKA-15928 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   * KAFKA-15945 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs()
   * KAFKA-15961 
kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart()
   * KAFKA-16225 
kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft
   * KAFKA-16323 
kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric()
   
   This one was not, so I created a JIRA:
   
   * KAFKA-16422 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16422) Flaky test org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"

2024-03-25 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16422:
---

 Summary: Flaky test 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"
 Key: KAFKA-16422
 URL: https://issues.apache.org/jira/browse/KAFKA-16422
 Project: Kafka
  Issue Type: Bug
Reporter: Igor Soarez


{code:java}
[2024-03-22T10:39:59.911Z] Gradle Test Run :metadata:test > Gradle Test 
Executor 92 > QuorumControllerMetricsIntegrationTest > 
testFailingOverIncrementsNewActiveControllerCount(boolean) > 
"testFailingOverIncrementsNewActiveControllerCount(boolean).true" FAILED
[2024-03-22T10:39:59.912Z]     org.opentest4j.AssertionFailedError: expected: 
<1> but was: <2>
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161)
[2024-03-22T10:39:59.912Z]         at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:632)
[2024-03-22T10:39:59.912Z]         at 
app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.lambda$testFailingOverIncrementsNewActiveControllerCount$1(QuorumControllerMetricsIntegrationTest.java:107)
[2024-03-22T10:39:59.912Z]         at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
[2024-03-22T10:39:59.912Z]         at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:412)
[2024-03-22T10:39:59.912Z]         at 
app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testFailingOverIncrementsNewActiveControllerCount(QuorumControllerMetricsIntegrationTest.java:105)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14246) Update threading model for Consumer

2024-03-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14246.
---
Resolution: Fixed

> Update threading model for Consumer
> ---
>
> Key: KAFKA-14246
> URL: https://issues.apache.org/jira/browse/KAFKA-14246
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Hi community,
>  
> We are refactoring the current KafkaConsumer and making it more asynchronous. 
>  This is the master Jira to track the project's progress; subtasks will be 
> linked to this ticket.  Please review the design document and feel free to 
> use this thread for discussion. 
>  
> The design document is here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]
>  
> The original email thread is here: 
> [https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l]
>  
> I will continue to update the 1pager as reviews and comments come.
>  
> Thanks, 
> P



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16420) Replace utils.Exit with a thread-safe alternative

2024-03-25 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830603#comment-17830603
 ] 

Greg Harris commented on KAFKA-16420:
-

I can't find a single ticket for "Remove Zookeeper", but I think that is also a 
precondition for implementing this change. I found a number of code-paths which 
are ZK specific that would make this migration significantly more involved, so 
I think it is best that those are removed first.

> Replace utils.Exit with a thread-safe alternative
> -
>
> Key: KAFKA-16420
> URL: https://issues.apache.org/jira/browse/KAFKA-16420
> Project: Kafka
>  Issue Type: Wish
>  Components: connect, core, tools
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> The Exit class is not thread-safe, and exposes our tests to race conditions 
> and inconsistent execution. It is not possible to make it thread-safe due to 
> the static design of the API.
> We should add an alternative to the Exit class, and migrate the existing 
> usages to the replacement, before finally removing the legacy Exit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16421) Refactor CommandDefaultOptions subclasses to throw exceptions instead of calling exit.

2024-03-25 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16421:
---

 Summary: Refactor CommandDefaultOptions subclasses to throw 
exceptions instead of calling exit.
 Key: KAFKA-16421
 URL: https://issues.apache.org/jira/browse/KAFKA-16421
 Project: Kafka
  Issue Type: Wish
  Components: tools
Reporter: Greg Harris


Many command-line utilities use the "mainNoExit()" idiom to provide a testable 
entrypoint to the command-line utility that doesn't include calling 
System.exit. This allows tests to safely exercise the command-line utility 
end-to-end, without risk that the JVM will stop.

Often, command implementations themselves adhere to this idiom, and don't call 
Exit. However, this is compromised by the CommandLineUtils functions, which 
call Exit.exit when an error is encountered while parsing the command-line 
arguments. 

These utilities are pervasively used in subclasses of CommandDefaultOptions, 
across hundreds of call-sites. We should figure out a way to replace this exit 
behavior with exceptions that are eventually propagated from the *Options 
constructors. This will allow the command-line implementations to handle these 
errors, and return the appropriate exit code from mainNoExit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition

2024-03-25 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830590#comment-17830590
 ] 

Greg Harris commented on KAFKA-16349:
-

I added a tactical fix for the Exit class in my PR to resolve this bug. I'll 
pursue this refactor in a separate ticket KAFKA-16420.

> ShutdownableThread fails build by calling Exit with race condition
> --
>
> Key: KAFKA-16349
> URL: https://issues.apache.org/jira/browse/KAFKA-16349
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Minor
>
> `ShutdownableThread` calls `Exit.exit()` when the thread's operation throws 
> FatalExitError. In normal operation, this calls System.exit, and exits the 
> process. In tests, the exit procedure is masked with Exit.setExitProcedure to 
> prevent tests that encounter a FatalExitError from crashing the test JVM.
> Masking of exit procedures is usually done in BeforeEach/AfterEach 
> annotations, with the exit procedures cleaned up immediately after the test 
> finishes. If the body of the test creates a ShutdownableThread that outlives 
> the test, such as by omitting `ShutdownableThread#awaitShutdown`, by having 
> `ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a 
> race condition between `Exit.resetExitProcedure` and `Exit.exit`, then 
> System.exit() can be called erroneously.
>  
> {noformat}
> // First, in the test thread:
> Exit.setExitProcedure(...)
> try {
> new ShutdownableThread(...).start()
> } finally {
> Exit.resetExitProcedure()
> }
> // Second, in the ShutdownableThread:
> try {
> throw new FatalExitError(...)
> } catch (FatalExitError e) {
> Exit.exit(...) // Calls real System.exit()
> }{noformat}
>  
> This can be resolved by one of the following:
>  # Eliminate FatalExitError usages in code when setExitProcedure is in-use
>  # Eliminate the Exit.exit call from ShutdownableThread, and instead 
> propagate this error to another thread to handle without a race-condition
>  # Eliminate resetExitProcedure by refactoring Exit to be non-static
> FatalExitError is in use in a small number of places, but may be difficult to 
> eliminate:
>  * FinalizedFeatureChangeListener
>  * InterBrokerSendThread
>  * TopicBasedRemoteLogMetadataManager
> There are many other places where Exit is called from a background thread, 
> including some implementations of ShutdownableThread which don't use 
> FatalExitError.
> The effect of this bug is that the build is flaky, as race 
> conditions/timeouts in tests can cause the gradle executors to exit with 
> status code 1, which has happened 26 times in the last 28 days. I have not 
> yet been able to confirm this bug is happening in other tests, but I do have 
> a deterministic reproduction case with the exact same symptoms:
> {noformat}
> Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest 
> > testShutdownWhenTestTimesOut(boolean) > 
> "testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:test'.
> > Process 'Gradle Test Executor 38' finished with non-zero exit value 1
>   This problem might be caused by incorrect test process configuration.
>   For more on test execution, please refer to 
> https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
> the Gradle documentation.{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-03-25 Thread via GitHub


OmniaGM commented on PR #15575:
URL: https://github.com/apache/kafka/pull/15575#issuecomment-2018444788

   > Hello @AndrewJSchofield @OmniaGM
   > 
   > Looks like dependency on server tests not required but introduced in this 
PR. I have plans to double check this after minor comment in #15569 will be 
resolved.
   
   updated the pr now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16420) Replace utils.Exit with a thread-safe alternative

2024-03-25 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16420:
---

 Summary: Replace utils.Exit with a thread-safe alternative
 Key: KAFKA-16420
 URL: https://issues.apache.org/jira/browse/KAFKA-16420
 Project: Kafka
  Issue Type: Wish
  Components: connect, core, tools
Reporter: Greg Harris
Assignee: Greg Harris


The Exit class is not thread-safe, and exposes our tests to race conditions and 
inconsistent execution. It is not possible to make it thread-safe due to the 
static design of the API.

We should add an alternative to the Exit class, and migrate the existing usages 
to the replacement, before finally removing the legacy Exit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-03-25 Thread via GitHub


jolshan commented on PR #15559:
URL: https://github.com/apache/kafka/pull/15559#issuecomment-2018411363

   Getting a fresh build before merging  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16398) mirror-maker2 running into OOM while filtering (dropping) high number of messages

2024-03-25 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830576#comment-17830576
 ] 

Greg Harris commented on KAFKA-16398:
-

Hi [~srivignesh] Thank you for providing your configuration.

Please note that this part of the configuration:
{noformat}
  "transforms": "Filter",
  "transforms.Filter.type": 
"org.apache.kafka.connect.transforms.Filter",{noformat}
will drop every record unconditionally, so you won't get any of the load/memory 
pressure that the producers provide. This would explain why adding the 
predicate (which then allows some records through) would cause OOMs.

On that note, you have a lot of tuning parameters in your connect-distributed 
config, some of which don't appear to have any effect. One that does appear to 
have an effect is `producer.buffer.memory=8388608`.
For 1500 tasks with that config, I would expect just the producer buffers to 
consume 12GiB, without taking into account any other buffers/memory overhead. 
If all of the tasks were started on a single node, this could very easily cause 
OOMs.
What size cluster are you using, and does the cluster ever shrink to a single 
node, such as with a full reboot/cold start? If so, you may want to reduce the 
number of tasks, or size of each task's producer buffer.

> mirror-maker2 running into OOM while filtering (dropping) high number of 
> messages
> -
>
> Key: KAFKA-16398
> URL: https://issues.apache.org/jira/browse/KAFKA-16398
> Project: Kafka
>  Issue Type: Bug
>  Components: connect, mirrormaker
>Affects Versions: 3.6.1
>Reporter: Srivignesh
>Priority: Critical
> Attachments: connect-distributed.properties.template, 
> mm2.config.template
>
>
> Based on custom predicate, our application is filtering messages during 
> mirroring.
> When the HasHeader:test method of the predicate returns true (when it has to 
> drop messages from mirroring), it encounters below exceptions. 
> However when it returns false (the messages are forwarded for mirroring), it 
> works fine without OOM. 
> Note: This issue doesn't occur with the same load in version 2.8.0.
> JVM heap size increased till 15G, but still OOM hits.
> Exception stacktraces:
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
> line     at 
> org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
> line     at 
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> line     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x7f55cc4c3d78.run(Unknown
>  Source)
> line     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space line     at 
> 

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018408733

   @soarez : Are the failed tests in the latest run being tracked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-25 Thread via GitHub


FrankYang0529 commented on PR #15505:
URL: https://github.com/apache/kafka/pull/15505#issuecomment-2018399116

   > @FrankYang0529 , there are some tests failing because of your change. 
Please take a look. Thanks.
   > 
   > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/4
   
   Sorry for that. I will double check the test result next time. I have fixed 
error in `testScramWithBadMetadataVersion`, 
`testMetadataVersionChangeExceptionToString`, and 
`testCreateClusterInvalidMetadataVersion`. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-25 Thread via GitHub


gharris1727 opened a new pull request, #15597:
URL: https://github.com/apache/kafka/pull/15597

   Add Utils.castToStringObjectMap and fix unchecked casts in AbstractConfig
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process

2024-03-25 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16419:
---
Description: 
Currently in the 
[LogValidator.validateMessagesAndAssignOffsetsCompressed|https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315],
 there are lots of if-else checks based on the `magic` and `CompressionType`, 
which makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into 5 steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type

  was:
Currently in the 
[LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315),
 there are lots of if-else checks based on the `magic` and `CompressionType`, 
which makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into 5 steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type


> Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply 
> the process
> -
>
> Key: KAFKA-16419
> URL: https://issues.apache.org/jira/browse/KAFKA-16419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>
> Currently in the 
> [LogValidator.validateMessagesAndAssignOffsetsCompressed|https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315],
>  there are lots of if-else checks based on the `magic` and `CompressionType`, 
> which makes the code complicated and increase the difficulties of 
> maintaining. 
> The flow of the validation can be separated into 5 steps:
>  # IBP validation
>  ## whether the compression type is valid for this IBP
>  # In-place assignment enablement check
>  ## based on the magic value and compression type, decide whether we can do 
> in-place assignment
>  # batch level validation
>  ## based on the batch origin (client, controller, etc) and magic version
>  # record level validation
>  ## based on whether we can do in-place assignment, choose different iterator 
>  ## based on the magic and compression type, do different validation
>  # return validated results
>  ## based on whether we can do in-place assignment, build the records or 
> assign it
> This whole flow can be extracted into an interface, and the 
> LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
> implementation based on the passed-in records.
> The implementation class will have the following fields:
>  # magic value
>  # source compression type
>  # target compression type
>  # origin
>  # records
>  # timestamp type



--
This message was sent by Atlassian Jira

[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process

2024-03-25 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16419:
---
Description: 
Currently in the 
[LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315),
 there are lots of if-else checks based on the `magic` and `CompressionType`, 
which makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into 5 steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type

  was:
Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there 
are lots of if-else checks based on the `magic` and `CompressionType`, which 
makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into 5 steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type


> Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply 
> the process
> -
>
> Key: KAFKA-16419
> URL: https://issues.apache.org/jira/browse/KAFKA-16419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>
> Currently in the 
> [LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315),
>  there are lots of if-else checks based on the `magic` and `CompressionType`, 
> which makes the code complicated and increase the difficulties of 
> maintaining. 
> The flow of the validation can be separated into 5 steps:
>  # IBP validation
>  ## whether the compression type is valid for this IBP
>  # In-place assignment enablement check
>  ## based on the magic value and compression type, decide whether we can do 
> in-place assignment
>  # batch level validation
>  ## based on the batch origin (client, controller, etc) and magic version
>  # record level validation
>  ## based on whether we can do in-place assignment, choose different iterator 
>  ## based on the magic and compression type, do different validation
>  # return validated results
>  ## based on whether we can do in-place assignment, build the records or 
> assign it
> This whole flow can be extracted into an interface, and the 
> LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
> implementation based on the passed-in records.
> The implementation class will have the following fields:
>  # magic value
>  # source compression type
>  # target compression type
>  # origin
>  # records
>  # timestamp type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]

2024-03-25 Thread via GitHub


FrankYang0529 commented on PR #15586:
URL: https://github.com/apache/kafka/pull/15586#issuecomment-2018334375

   > Failed with the same error again. Please help fix the errors. Thanks.
   
   Sorry, I will double check test result before creating a PR next time. I 
have updated the case. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process

2024-03-25 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16419:
---
Description: 
Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there 
are lots of if-else checks based on the `magic` and `CompressionType`, which 
makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into 5 steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type

  was:
Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there 
are lots of if-else checks based on the `magic` and `CompressionType`, which 
makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into x steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type


> Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply 
> the process
> -
>
> Key: KAFKA-16419
> URL: https://issues.apache.org/jira/browse/KAFKA-16419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>
> Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, 
> there are lots of if-else checks based on the `magic` and `CompressionType`, 
> which makes the code complicated and increase the difficulties of 
> maintaining. 
> The flow of the validation can be separated into 5 steps:
>  # IBP validation
>  ## whether the compression type is valid for this IBP
>  # In-place assignment enablement check
>  ## based on the magic value and compression type, decide whether we can do 
> in-place assignment
>  # batch level validation
>  ## based on the batch origin (client, controller, etc) and magic version
>  # record level validation
>  ## based on whether we can do in-place assignment, choose different iterator 
>  ## based on the magic and compression type, do different validation
>  # return validated results
>  ## based on whether we can do in-place assignment, build the records or 
> assign it
> This whole flow can be extracted into an interface, and the 
> LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
> implementation based on the passed-in records.
> The implementation class will have the following fields:
>  # magic value
>  # source compression type
>  # target compression type
>  # origin
>  # records
>  # timestamp type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process

2024-03-25 Thread Johnny Hsu (Jira)
Johnny Hsu created KAFKA-16419:
--

 Summary: Abstract validateMessagesAndAssignOffsetsCompressed of 
LogValidator to simply the process
 Key: KAFKA-16419
 URL: https://issues.apache.org/jira/browse/KAFKA-16419
 Project: Kafka
  Issue Type: Improvement
Reporter: Johnny Hsu
Assignee: Johnny Hsu


Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there 
are lots of if-else checks based on the `magic` and `CompressionType`, which 
makes the code complicated and increase the difficulties of maintaining. 

The flow of the validation can be separated into x steps:
 # IBP validation
 ## whether the compression type is valid for this IBP
 # In-place assignment enablement check
 ## based on the magic value and compression type, decide whether we can do 
in-place assignment
 # batch level validation
 ## based on the batch origin (client, controller, etc) and magic version
 # record level validation
 ## based on whether we can do in-place assignment, choose different iterator 
 ## based on the magic and compression type, do different validation
 # return validated results
 ## based on whether we can do in-place assignment, build the records or assign 
it

This whole flow can be extracted into an interface, and the 
LogValidator.validateMessagesAndAssignOffsetsCompressed can init an 
implementation based on the passed-in records.

The implementation class will have the following fields:
 # magic value
 # source compression type
 # target compression type
 # origin
 # records
 # timestamp type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1537804695


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -411,8 +411,8 @@ public int memberEpoch() {
 public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData 
response) {
 if (response.errorCode() != Errors.NONE.code()) {
 String errorMessage = String.format(
-"Unexpected error in Heartbeat response. Expected no 
error, but received: %s",

Review Comment:
   good call.  I think it was editor's auto correction.  Reverting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16318) Add javadoc to KafkaMetric

2024-03-25 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830539#comment-17830539
 ] 

Johnny Hsu commented on KAFKA-16318:


the PR is merged, close this ticket

> Add javadoc to KafkaMetric
> --
>
> Key: KAFKA-16318
> URL: https://issues.apache.org/jira/browse/KAFKA-16318
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0
>
>
> KafkaMetric is part of the public API but it's missing javadoc describing the 
> class and several of its methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: optimize EvictableKey/LastUsedKey compareTo function [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9725: MINOR: optimize EvictableKey/LastUsedKey 
compareTo function
URL: https://github.com/apache/kafka/pull/9725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add comments to explain why it needs to add synchronization on… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9517: MINOR: add comments to explain why it needs 
to add synchronization on…
URL: https://github.com/apache/kafka/pull/9517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #8978: KAFKA-10234 The key/value deserializer used 
by ConsoleConsumer is not…
URL: https://github.com/apache/kafka/pull/8978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10498 Consumer should do offset/epoch validation through when … [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9796: KAFKA-10498 Consumer should do offset/epoch 
validation through  when …
URL: https://github.com/apache/kafka/pull/9796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: apply FilterByKeyIterator and FlattenedIterator to code base [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9954: MINOR: apply FilterByKeyIterator and 
FlattenedIterator to code base
URL: https://github.com/apache/kafka/pull/9954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix the soft link created by ducktape when running system test… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9798: MINOR: fix the soft link created by 
ducktape when running system test…
URL: https://github.com/apache/kafka/pull/9798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Convert connect assignment schemas to use generated protocol [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #9641: MINOR: Convert connect assignment schemas 
to use generated protocol
URL: https://github.com/apache/kafka/pull/9641


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12410 KafkaAPis ought to group fetch data before generating fet… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch 
data before generating fet…
URL: https://github.com/apache/kafka/pull/10269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: KRPC supports to get true type from entity type [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10283: MINOR: KRPC supports to get true type from 
entity type
URL: https://github.com/apache/kafka/pull/10283


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: using INFO level to log 'no meta.properties' for broker server [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10261: MINOR: using INFO level to log 'no 
meta.properties' for broker server
URL: https://github.com/apache/kafka/pull/10261


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: main function of o.a.k.c.p.t.Type does not show all types [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10248: MINOR: main function of o.a.k.c.p.t.Type 
does not show all types
URL: https://github.com/apache/kafka/pull/10248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12309 The revocation algorithm produces uneven distributions [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10077: KAFKA-12309 The revocation algorithm 
produces uneven distributions
URL: https://github.com/apache/kafka/pull/10077


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: check duplicate advertised listeners based on resolved host [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #10577: MINOR: check duplicate advertised 
listeners based on resolved host
URL: https://github.com/apache/kafka/pull/10577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove partition-level error from MetadataResponse#errorCounts [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #11128: MINOR: remove partition-level error from 
MetadataResponse#errorCounts
URL: https://github.com/apache/kafka/pull/11128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: don't record the throttled rate when there is no throttled par… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #12528: MINOR: don't record the throttled rate 
when there is no throttled par…
URL: https://github.com/apache/kafka/pull/12528


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13874 Avoid synchronization in SocketServer metrics [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #13285: KAFKA-13874 Avoid synchronization in 
SocketServer metrics
URL: https://github.com/apache/kafka/pull/13285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: don't disconnect stale controller if the network client is res… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #13395: MINOR: don't disconnect stale controller 
if the network client is res…
URL: https://github.com/apache/kafka/pull/13395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14811 The forwarding requests are discarded when network client… [kafka]

2024-03-25 Thread via GitHub


chia7712 closed pull request #13405: KAFKA-14811 The forwarding requests are 
discarded when network client…
URL: https://github.com/apache/kafka/pull/13405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]

2024-03-25 Thread via GitHub


cadonna commented on PR #15581:
URL: https://github.com/apache/kafka/pull/15581#issuecomment-2018163251

   Sorry for that! I do not know why I am so sloppy with this PR. I am going to 
fix this now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16416: Use NetworkClientTest to replace RequestResponseTest to be the example of log4j output [kafka]

2024-03-25 Thread via GitHub


KevinZTW commented on code in PR #15596:
URL: https://github.com/apache/kafka/pull/15596#discussion_r1537700914


##
README.md:
##
@@ -56,7 +56,11 @@ Follow instructions in https://kafka.apache.org/quickstart
 ### Running a particular unit/integration test with log4j output ###
 Change the log4j setting in either 
`clients/src/test/resources/log4j.properties` or 
`core/src/test/resources/log4j.properties`
 
-./gradlew clients:test --tests RequestResponseTest
+For example, you can modify the line in 
`clients/src/test/resources/log4j.properties` to 
`log4j.logger.org.apache.kafka=INFO` and then run:
+
+./gradlew cleanTest clients:test --tests NetworkClientTest --info   

Review Comment:
   I see, I add the `--info`  is because I want the user could also see logs on 
the console directly agree it do print out lots of info though...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru merged PR #15535:
URL: https://github.com/apache/kafka/pull/15535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru commented on PR #15535:
URL: https://github.com/apache/kafka/pull/15535#issuecomment-2018145845

   Two related flaky tests, 
`kafka.api.PlaintextConsumerTest.testExpandingTopicSubscriptions(String, 
String)[4]` and 
`org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeExistingGroupWithNoMembers(String,
 String)[4]`. But doesn't seem to be caused by this PR, as they have flaked on 
trunk before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   FYI, i write skeleton code below! 
   Does it make sense to you as well?
   ```java
   public class RepartitionTopics {
   ...
   // Add new field (private)
   private final Set missingTopics = new HashSet(); 
   
   ...
   public Set topologiesWithMissingInputTopics() { ... }
   public Queue missingSourceTopicExceptions() { ... }
   
   // Add new method (package-private)
   Set getMissingTopics() {
 return this.missingTopics;
   }
  ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   FYI, i write skeleton code below! 
   Does it make sense to you as well?
   ```java
   public class RepartitionTopics {
   ...
   // Add new field (private)
   private final Set missingTopics = new HashSet(); 
   
   ...
   public Queue missingSourceTopicExceptions() {...}
   
   // Add new method (package-private)
   Set getMissingTopics() {
 return this.missingTopics;
   }
  ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-25 Thread via GitHub


johnnychhsu commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2018127603

   updated @showuon 
   it works in my local, thanks for the sharing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >