[jira] [Commented] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde
[ https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830781#comment-17830781 ] Linu Shibu commented on KAFKA-16356: Yes [~yondy] , I will update this soon, thanks! > Remove class-name dispatch in RemoteLogMetadataSerde > > > Key: KAFKA-16356 > URL: https://issues.apache.org/jira/browse/KAFKA-16356 > Project: Kafka > Issue Type: Task > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Greg Harris >Assignee: Linu Shibu >Priority: Trivial > Labels: newbie > > The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and > has to dispatch to one of four serializers depending on it's type. This is > done by taking the class name of the RemoteLogMetadata and looking it up in > maps to find the corresponding serializer for that class. > This later requires an unchecked cast, because the RemoteLogMetadataTransform > is generic. This is all type-unsafe, and can be replaced with type-safe > if-elseif-else statements that may also be faster than the double-indirect > map lookups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: add docker usage documentation link in README.md [kafka]
showuon commented on code in PR #15600: URL: https://github.com/apache/kafka/pull/15600#discussion_r1538541164 ## README.md: ## @@ -285,6 +285,16 @@ See [tests/README.md](tests/README.md). See [vagrant/README.md](vagrant/README.md). +### Running in Docker ### +You could use the official docker image to run Kafka in Docker. It is available at [Docker Hub](https://hub.docker.com/r/apache/kafka). + +For example, to start a single-node Kafka, you can run the following command: + + docker compose -f docker/examples/jvm/single-node/plaintext/docker-compose.yml up + + Review Comment: nit: additional empty line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16416: Use NetworkClientTest to replace RequestResponseTest to be the example of log4j output [kafka]
showuon commented on code in PR #15596: URL: https://github.com/apache/kafka/pull/15596#discussion_r1538537571 ## README.md: ## @@ -56,7 +56,11 @@ Follow instructions in https://kafka.apache.org/quickstart ### Running a particular unit/integration test with log4j output ### Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties` Review Comment: I think there are many other `log4j.properties` needed to be modified if you are running tests for that project, not just these 2 files. For example, when you are running tests in `TopicCommandTest`, you should update: `tools/src/test/resources/log4j.properties`, instead of the files listed here. I'm thinking we should rephrase it to a more general way. Maybe: By default, there will be only small number of logs output while testing. You can adjust it by changing the `log4j.properties` in the test project directory. For example, if you want to see more logs for clients project tests, you can modify the line in `clients/src/test/resources/log4j.properties` to `log4j.logger.org.apache.kafka=INFO` and then run: WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]
showuon commented on PR #15505: URL: https://github.com/apache/kafka/pull/15505#issuecomment-2019263725 Re-running CI build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/6/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: add docker usage documentation link in README.md [kafka]
KevinZTW opened a new pull request, #15600: URL: https://github.com/apache/kafka/pull/15600 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
brandboat commented on code in PR #15588: URL: https://github.com/apache/kafka/pull/15588#discussion_r1538483814 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; public static final String RETENTION_MS_CONFIG = "retention.ms"; public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " + "log before we will discard old log segments to free up space if we are using the " + "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + -"their data. If set to -1, no time limit is applied."; +"their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment."; Review Comment: Thanks for the feedback ! Do you think this is ok ? ``` Moreover, it triggers the expiration of active segment, segment expiration refers to the complete removal of segments from the partition once the retention.ms condition is satisfied. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
brandboat commented on code in PR #15588: URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; Review Comment: The segment expiration means segment is completely removed once the retention limits are met. Maybe add a sentence like this ? ``` Segment expiration refers to the complete removal of segments from the partition once the retention.bytes condition is satisfied. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
brandboat commented on code in PR #15588: URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; Review Comment: The segment expiration means segment is completely removed once the retention limits are met. Maybe add a sentence like this ? ``` Segment expiration refers to the complete removal of segments from the partition once the retention conditions are satisfied. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
brandboat commented on code in PR #15588: URL: https://github.com/apache/kafka/pull/15588#discussion_r1538478333 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; Review Comment: The segment expiration means segment expiration are completely removed once the retention limits are met. Maybe add a sentence like this ? ``` Segment expiration refers to the complete removal of segments from the partition once the retention conditions are satisfied. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
showuon commented on code in PR #15463: URL: https://github.com/apache/kafka/pull/15463#discussion_r1538452997 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4164,16 +4164,13 @@ class ReplicaManagerTest { mock(classOf[FetchDataInfo]) }).when(spyRLM).read(any()) - // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec metric value before fetching - val curExpiresPerSec = safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long] + val curExpiresPerSec = DelayedRemoteFetchMetrics.expiredRequestMeter.count() replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 10, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback) // advancing the clock to expire the delayed remote fetch timer.advanceClock(2000L) - // verify the metric value is incremented since the delayed remote fetch is expired - TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long], -"The ExpiresPerSec value is not incremented. Current value is: " + - safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]) + // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is called since the delayed remote fetch is expired + assertEquals(curExpiresPerSec + 1, DelayedRemoteFetchMetrics.expiredRequestMeter.count()) Review Comment: I think we should use use `TestUtils.waitUntilTrue` here because the `DelayedRemoteFetchMetrics` is marked in a separate thread, we can't make sure it will be triggered immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16409) kafka-delete-records / DeleteRecordsCommand should use standard exception handling
[ https://issues.apache.org/jira/browse/KAFKA-16409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16409. --- Fix Version/s: 3.8.0 Resolution: Fixed > kafka-delete-records / DeleteRecordsCommand should use standard exception > handling > -- > > Key: KAFKA-16409 > URL: https://issues.apache.org/jira/browse/KAFKA-16409 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 3.7.0 >Reporter: Greg Harris >Assignee: PoAn Yang >Priority: Minor > Labels: newbie > Fix For: 3.8.0 > > > When an exception is thrown in kafka-delete-records, it propagates through > `main` to the JVM, producing the following message: > {noformat} > bin/kafka-delete-records.sh --bootstrap-server localhost:9092 > --offset-json-file /tmp/does-not-exist > Exception in thread "main" java.io.IOException: Unable to read file > /tmp/does-not-exist > at > org.apache.kafka.common.utils.Utils.readFileAsString(Utils.java:787) > at > org.apache.kafka.tools.DeleteRecordsCommand.execute(DeleteRecordsCommand.java:105) > at > org.apache.kafka.tools.DeleteRecordsCommand.main(DeleteRecordsCommand.java:64) > Caused by: java.nio.file.NoSuchFileException: /tmp/does-not-exist > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) > at java.nio.file.Files.newByteChannel(Files.java:407) > at java.nio.file.Files.readAllBytes(Files.java:3152) > at > org.apache.kafka.common.utils.Utils.readFileAsString(Utils.java:784) > ... 2 more{noformat} > This is in contrast to the error handling used in other tools, such as the > kafka-log-dirs: > {noformat} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > --command-config /tmp/does-not-exist > /tmp/does-not-exist > java.nio.file.NoSuchFileException: /tmp/does-not-exist > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) > at java.nio.file.Files.newByteChannel(Files.java:407) > at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) > at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:686) > at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:673) > at > org.apache.kafka.tools.LogDirsCommand.createAdminClient(LogDirsCommand.java:149) > at > org.apache.kafka.tools.LogDirsCommand.execute(LogDirsCommand.java:68) > at > org.apache.kafka.tools.LogDirsCommand.mainNoExit(LogDirsCommand.java:54) > at > org.apache.kafka.tools.LogDirsCommand.main(LogDirsCommand.java:49){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]
showuon merged PR #15586: URL: https://github.com/apache/kafka/pull/15586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]
showuon commented on PR #15586: URL: https://github.com/apache/kafka/pull/15586#issuecomment-2019187474 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830734#comment-17830734 ] Henry Cai commented on KAFKA-15265: --- How is the progress of this tiered storage quota work? We are also facing the performance problem (large network outburst / cpu saturation) during the initial topic conversion to be remote.storage.enabled > Remote copy/fetch quotas for tiered storage. > > > Key: KAFKA-15265 > URL: https://issues.apache.org/jira/browse/KAFKA-15265 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > > Related KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
[ https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830733#comment-17830733 ] Zhiyuan Lei commented on KAFKA-16240: - ``` TimeUnit.MILLISECONDS.sleep(1) val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava) ``` I found something interesting. This issue is indeed easily reproducible locally. After some investigation, it seems to be caused by the time difference of the low watermark update in the replica. As long as we add a wait time here, this test case can be guaranteed to pass 100%. Waiting patiently is not a good solution, Further work is still required. > Flaky test > PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft > - > > Key: KAFKA-16240 > URL: https://issues.apache.org/jira/browse/KAFKA-16240 > Project: Kafka > Issue Type: Test >Reporter: Gantigmaa Selenge >Priority: Minor > > Failed run > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] > Stack trace > {code:java} > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: deleteRecords(api=DELETE_RECORDS) at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16272: Update connect_distributed_test.py to support KIP-848’s group protocol config [kafka]
philipnee commented on PR #15576: URL: https://github.com/apache/kafka/pull/15576#issuecomment-2019090272 @kirktrue @lucasbru - mind taking a second look? For this specific PR I only modified the tests using console consumer that's why several tests cases were omitted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
philipnee commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1538356233 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: are you sure this is what group protocol config called in connect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan merged PR #15559: URL: https://github.com/apache/kafka/pull/15559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2019067710 [testAlterSinkConnectorOffsetsOverriddenConsumerGroupId](https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId) [testSeparateOffsetsTopic](https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest=testSeparateOffsetsTopic) These caught my eye but have been flaky/failing since before this change. Here are JIRAs for them: https://issues.apache.org/jira/browse/KAFKA-15914 and https://issues.apache.org/jira/browse/KAFKA-14089 The other tests look known and/or unrelated. I will go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
[ https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830728#comment-17830728 ] Justine Olshan commented on KAFKA-14089: I've seen this again as well. https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest=testSeparateOffsetsTopic > Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic > --- > > Key: KAFKA-14089 > URL: https://issues.apache.org/jira/browse/KAFKA-14089 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.3.0 > > Attachments: failure.txt, > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout > > > It looks like the sequence got broken around "65535, 65537, 65536, 65539, > 65538, 65541, 65540, 65543" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15914) Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - OffsetsApiIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830727#comment-17830727 ] Justine Olshan commented on KAFKA-15914: I'm seeing this as well. https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId > Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - > OffsetsApiIntegrationTest > --- > > Key: KAFKA-15914 > URL: https://issues.apache.org/jira/browse/KAFKA-15914 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Lucas Brutschy >Priority: Major > Labels: flaky-test > > Test intermittently gives the following result: > {code} > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:396) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId(OffsetsApiIntegrationTest.java:297) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue opened a new pull request, #15599: URL: https://github.com/apache/kafka/pull/15599 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16084: Simplify and deduplicate standalone herder test mocking [kafka]
gharris1727 commented on code in PR #15389: URL: https://github.com/apache/kafka/pull/15389#discussion_r1538282799 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -115,64 +115,61 @@ public class StandaloneHerderTest { private static final String TOPICS_LIST_STR = "topic1,topic2"; private static final String WORKER_ID = "localhost:8083"; private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; +private static final Long WAIT_TIME = 1000L; Review Comment: nit: 1 second is probably good enough, but it costs nothing to increase this now. Also the constant should probably include the unit, so `WAIT_TIME_MS`. ```suggestion private static final Long WAIT_TIME = 15000L; ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -227,59 +222,48 @@ public void testCreateConnectorAlreadyExists() throws Throwable { @Test public void testCreateSinkConnector() throws Exception { -connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map config = connectorConfig(SourceSink.SINK); -Connector connectorMock = mock(SinkConnector.class); -expectConfigValidation(connectorMock, true, config); +expectConfigValidation(SourceSink.SINK, config); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); -Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); +Herder.Created connectorInfo = createCallback.get(WAIT_TIME, TimeUnit.MILLISECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); } @Test public void testCreateConnectorWithStoppedInitialState() throws Exception { -connector = mock(BogusSinkConnector.class); Map config = connectorConfig(SourceSink.SINK); Connector connectorMock = mock(SinkConnector.class); Review Comment: nit: unused ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -1168,25 +1090,27 @@ private static Map taskConfig(SourceSink sourceSink) { } private void expectConfigValidation( -Connector connectorMock, -boolean shouldCreateConnector, +SourceSink sourceSink, Map... configs ) { // config validation +Connector connectorMock = sourceSink == SourceSink.SOURCE ? mock(SourceConnector.class) : mock(SinkConnector.class); when(worker.configTransformer()).thenReturn(transformer); final ArgumentCaptor> configCapture = ArgumentCaptor.forClass(Map.class); when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); when(worker.getPlugins()).thenReturn(plugins); when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); -if (shouldCreateConnector) { -when(worker.getPlugins()).thenReturn(plugins); -when(plugins.newConnector(anyString())).thenReturn(connectorMock); -} + +// Assume the connector should always be created +when(worker.getPlugins()).thenReturn(plugins); +when(plugins.newConnector(anyString())).thenReturn(connectorMock); when(connectorMock.config()).thenReturn(new ConfigDef()); -for (Map config : configs) +// Set up validation for each config +for (Map config : configs) { when(connectorMock.validate(config)).thenReturn(new Config(Collections.emptyList())); +} } // We need to use a real class here due to some issue with mocking java.lang.Class Review Comment: nit: My IDE is also warning me that these should be `static`, which is fine for how these are used. Could you also fix that? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -726,22 +668,21 @@ public void testAccessors() throws Exception { doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull()); doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); -// Create connector -connector = mock(BogusSourceConnector.class); + expectAdd(SourceSink.SOURCE); -expectConfigValidation(connector, true, connConfig); +expectConfigValidation(SourceSink.SOURCE, connConfig); // Validate accessors with 1 connector doNothing().when(listConnectorsCb).onCompletion(null, singleton(CONNECTOR_NAME)); ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), Review Comment:
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2018886221 Hey @lucasbru - Thanks for taking the time to review this PR. Let me know if there's anything to add to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1538195311 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -423,11 +432,11 @@ private CompletableFuture sendListOffsetsRequestsAndResetPositions( }); }); -if (unsentRequests.size() > 0) { +if (unsentRequests.isEmpty()) { Review Comment: @lucasbru - Switched the order as !__.isEmpty is rather difficult to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2018835239 Hey @Phuc-Hong-Tran , thanks a lot for the PR! I had a first pass and left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
soarez commented on code in PR #15588: URL: https://github.com/apache/kafka/pull/15588#discussion_r1538177857 ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; public static final String RETENTION_MS_CONFIG = "retention.ms"; public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " + "log before we will discard old log segments to free up space if we are using the " + "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + -"their data. If set to -1, no time limit is applied."; +"their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment."; Review Comment: This sentence seems incomplete. ## clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java: ## @@ -67,13 +67,17 @@ public class TopicConfig { "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + -"the topic retention in bytes."; +"the topic retention in bytes. Additionally, retention.bytes configuration " + +"operates independently of \"segment.ms\" and \"segment.byte\" configurations. " + +"Moreover, it triggers the expiration of active segment if retention.bytes is configured to zero."; Review Comment: Should we explain what "segment expiration" entails? I don't think it's clear from the description what it would mean in practical terms for the active segment to expire. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538180339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} Review Comment: Just for consistency, what about we encapsulate this in something like `maybeUpdateSubscriptionMetadata`? It would align nicely with the above funcs (and also that's how the similar functionality is named in the legacy coordinator so would be helpful to understand how that piece of logic translates into the new consumer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538173798 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } else { // SubscribedTopicRegex - only sent if has changed since the last heartbeat // - not supported yet +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; +} Review Comment: With this addition we end up with the exact same code repeated for the `if` and `else`, so I would say we should find a better way of doing this. First solution that comes to mind is to remove the if/else. In the end, we have a single case to handle here: send explicit subscriptions (topic names) to the broker (from the HB Mgr POV and to the broker, it does not make a diff if the topic list came from a call to subscribe with topics or a call to subscribe with Pattern that we internally resolved on the client) When we take on the next task of supporting the new regex, we'll actually have to send something different here, so we can decide then how to best differentiate the 2 cases. For now, including this PR, we only support 1 case regarding the content of what we send in the HB regarding subscription. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker
[ https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830685#comment-17830685 ] Greg Harris edited comment on KAFKA-16344 at 3/25/24 8:06 PM: -- Hi [~janardhanag] Yes, you should consider increasing offset.lag.max. 0 is best for precise offset translation, but can significantly increase the amount of traffic on the offset-syncs topic. For topics without offset gaps (e.g. not compacted, not transactional) the offset.lag.max currently behaves as a ratio between the amount of records/sec in a mirrored topic, and that topic's additional load on the offset-syncs topic. There's a semaphore [https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L95] that prevents this load from slowing down the mirror source connector, but it won't protect the brokers holding the mm2 offsets topic from being overloaded. For example, if you were to mirror 100 records, offset.lag.max=0 could write 100 records to the offset-syncs topic, generating huge load. For 100 records with offset.lag.max=100, the number of offset syncs could be 1, 100x less. For offset.lag.max=1, the number of offset syncs could be 100, 1x less. To answer your questions: > 1. What is the proportion of mm2-offsetsyncsinternal topic writes to overall >MM2 traffic? Configurable with offset.lag.max for topics with contiguous offsets. For transactional topics it isn't configurable, as each transaction may cause 1 or 2 offset syncs and that isn't limited by the offset.lag.max, so reducing the throughput may require increasing the transaction intervals on your source applications. > 2. Is there a way to tune the internal topic writes with increased traffic > MM2? As the throughput of your MM2 instance increases, you should increase the offset.lag.max to keep the offset-syncs topic fixed. > 3. I want to understand the expected mm2-offsetsyncsinternal write TPS for a > given amount of MM2 traffic. Are there any tunable parameters to reduce these > writes, and what are the consequences of tuning if any? There isn't a closed formula, because a sync may be initiated by multiple conditions [https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L360-L363] and prevented by the semaphore when the producer latency becomes significant. The consequence of decreasing throughput on this topic is less precise offset translation in the MirrorCheckpointConnector, and increased redelivery when failing over from source to target consumer offsets. If you aren't using this connector, then the offset syncs topic isn't read from at all. You currently can't turn the topic off, but there is a KIP open for that: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector] . If for example you were satisfied with 30 seconds of redelivery for a topic which had 1000 records/second, then you could set offset.lag.max to 30*1000. > 4. In a larger Kafka cluster, if a single broker is overloaded with > mm2-offsetsyncsinternal traffic, can it lead to a broker crash? Are there any > guidelines available for such scenarios? Currently, we have 6 brokers with > 24K MM2 traffic, and internal writes are at 10K, resulting in a 20% CPU > increase on one broker. I'm not familiar with the failure conditions for overloaded brokers. If you are seeing failures then I would definitely recommend trying to tune MM2, or try to use quotas [https://kafka.apache.org/documentation/#design_quotas] to get this topic-partition under control. > 5. Are there any limitations on Kafka brokers scaling , I mean how much kafka >broker can be expanded in single Kakfa cluster due to the >mm2-offsetsyncsinternal topic? Because the offset syncs topic only supports a single partition, adding brokers will not solve this problem, other than having a broker dedicated to only serving this one topic. At some point, you would need two separate offset-syncs topics and separate MirrorSourceConnectors to shard the load. > 6. How can the system be dimensioned to handle MM2 internal topic writes > effectively? Are there any recommended figures available? For instance, for a > given amount of traffic (X), what percentage increase in CPU (Y) should each > broker have to handle MM2 internal topic writes? Note that in other pods, > this resource may not be utilized. The Kafka project doesn't publish "expected" dimensions, other than the default values of configurations. We don't publish performance analysis on any particular hardware setups, because the diversity of hardware running Kafka is just too vast to capture properly. was (Author: gharris1727): Hi
[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker
[ https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830685#comment-17830685 ] Greg Harris commented on KAFKA-16344: - Hi [~janardhanag] Yes, you should consider increasing offset.lag.max. 0 is best for precise offset translation, but can significantly increase the amount of traffic on the offset-syncs topic. For topics without offset gaps (e.g. not compacted, not transactional) the offset.lag.max currently behaves as a ratio between the amount of records/sec in a mirrored topic, and that topic's additional load on the offset-syncs topic. There's a semaphore https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L95 that prevents this load from slowing down the mirror source connector, but it won't protect the brokers holding the mm2 offsets topic from being overloaded. For example, if you were to mirror 100 records, offset.lag.max=0 could write 100 records to the offset-syncs topic, generating huge load. For 100 records with offset.lag.max=100, the number of offset syncs could be 1, 100x less. For offset.lag.max=1, the number of offset syncs could be 100, 1x less. To answer your questions: > 1. What is the proportion of mm2-offsetsyncsinternal topic writes to overall >MM2 traffic? Configurable with offset.lag.max for topics with contiguous offsets. For transactional topics it isn't configurable, as each transaction may cause 1 or 2 offset syncs and that isn't limited by the offset.lag.max, so reducing the throughput may require increasing the transaction intervals on your source applications. > 2. Is there a way to tune the internal topic writes with increased traffic > MM2? As the throughput of your MM2 instance increases, you should decrease the offset.lag.max to keep the offset-syncs topic fixed. > 3. I want to understand the expected mm2-offsetsyncsinternal write TPS for a > given amount of MM2 traffic. Are there any tunable parameters to reduce these > writes, and what are the consequences of tuning if any? There isn't a closed formula, because a sync may be initiated by multiple conditions [https://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L360-L363] and prevented by the semaphore when the producer latency becomes significant. The consequence of decreasing throughput on this topic is less precise offset translation in the MirrorCheckpointConnector, and increased redelivery when failing over from source to target consumer offsets. If you aren't using this connector, then the offset syncs topic isn't read from at all. You currently can't turn the topic off, but there is a KIP open for that: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector] . If for example you were satisfied with 30 seconds of redelivery for a topic which had 1000 records/second, then you could set offset.lag.max to 30*1000. > 4. In a larger Kafka cluster, if a single broker is overloaded with > mm2-offsetsyncsinternal traffic, can it lead to a broker crash? Are there any > guidelines available for such scenarios? Currently, we have 6 brokers with > 24K MM2 traffic, and internal writes are at 10K, resulting in a 20% CPU > increase on one broker. I'm not familiar with the failure conditions for overloaded brokers. If you are seeing failures then I would definitely recommend trying to tune MM2, or try to use quotas [https://kafka.apache.org/documentation/#design_quotas] to get this topic-partition under control. > 5. Are there any limitations on Kafka brokers scaling , I mean how much kafka >broker can be expanded in single Kakfa cluster due to the >mm2-offsetsyncsinternal topic? Because the offset syncs topic only supports a single partition, adding brokers will not solve this problem, other than having a broker dedicated to only serving this one topic. At some point, you would need two separate offset-syncs topics and separate MirrorSourceConnectors to shard the load. > 6. How can the system be dimensioned to handle MM2 internal topic writes > effectively? Are there any recommended figures available? For instance, for a > given amount of traffic (X), what percentage increase in CPU (Y) should each > broker have to handle MM2 internal topic writes? Note that in other pods, > this resource may not be utilized. The Kafka project doesn't publish "expected" dimensions, other than the default values of configurations. We don't publish performance analysis on any particular hardware setups, because the diversity of hardware running Kafka is just too vast to capture properly. > Internal topic mm2-offset-syncsinternal created with single > partition is putting
[jira] [Closed] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna closed KAFKA-16224. - > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Bruno Cadonna >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (meaning that we would effectively > consider UnknownTopicOrPartitionException as non-retriable, even though it > extends RetriableException, only when committing offsets before revocation as > part of this task) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has, tracked with a related issue given that it > would require a separate fix for the legacy consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]
cadonna merged PR #15581: URL: https://github.com/apache/kafka/pull/15581 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161659 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -433,7 +433,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: ditto ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -486,7 +486,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]
cadonna commented on PR #15581: URL: https://github.com/apache/kafka/pull/15581#issuecomment-2018802319 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538161334 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -374,7 +374,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { */ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. Review Comment: we should remove this TODO now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1538158322 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/blob/f8ce7feebcede6c6da93c649413a64695bc8d4c1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466)) . What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16420) Add thread-safe alternative to utils.Exit
[ https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16420: Summary: Add thread-safe alternative to utils.Exit (was: Replace utils.Exit with a thread-safe alternative) > Add thread-safe alternative to utils.Exit > - > > Key: KAFKA-16420 > URL: https://issues.apache.org/jira/browse/KAFKA-16420 > Project: Kafka > Issue Type: Wish > Components: connect, core, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > The Exit class is not thread-safe, and exposes our tests to race conditions > and inconsistent execution. It is not possible to make it thread-safe due to > the static design of the API. > We should add an alternative to the Exit class, and migrate the existing > usages to the replacement, before finally removing the legacy Exit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16294) Add group protocol migration enabling config
[ https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu updated KAFKA-16294: Description: The online upgrade is triggered when a consumer group heartbeat request is received in a classic group. The downgrade is triggered when any old protocol request is received in a consumer group. We only accept upgrade/downgrade if the corresponding group migration config policy is enabled. This is the first part of the implementation of online group protocol migration, adding the kafka config group protocol migration. The config has four valid values – both(both upgrade and downgrade are allowed), upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is allowed.). At present the default value is NONE. When we start enabling the migration, we expect to set BOTH to default so that it's easier to roll back to the old protocol as a quick fix for anything wrong in the new protocol; when using consumer groups becomes default and the migration is near finished, we will set the default policy to UPGRADE to prevent unwanted downgrade causing too frequent migration. DOWNGRADE could be useful for revert or debug purposes. was: The online upgrade is triggered when a consumer group heartbeat request is received in a classic group. The downgrade is triggered when any old protocol request is received in a consumer group. We only accept upgrade/downgrade if the corresponding group migration config policy is enabled. This is the first part of the implementation of online group protocol migration, adding the kafka config group protocol migration. The config has four valid values – both(both upgrade and downgrade are allowed), upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is allowed.). At present the default value is NONE. When we start enabling the migration, we expect to set BOTH to default so that it's easier to roll back to the old protocol as a quick fix for anything wrong in the new protocol; when using consumer groups becomes default and the migration is near finished, we will set the default policy to upgrade to prevent unwanted downgrade causing too frequent migration. > Add group protocol migration enabling config > > > Key: KAFKA-16294 > URL: https://issues.apache.org/jira/browse/KAFKA-16294 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > > The online upgrade is triggered when a consumer group heartbeat request is > received in a classic group. The downgrade is triggered when any old protocol > request is received in a consumer group. We only accept upgrade/downgrade if > the corresponding group migration config policy is enabled. > This is the first part of the implementation of online group protocol > migration, adding the kafka config group protocol migration. The config has > four valid values – both(both upgrade and downgrade are allowed), > upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and > none(neither is allowed.). > At present the default value is NONE. When we start enabling the migration, > we expect to set BOTH to default so that it's easier to roll back to the old > protocol as a quick fix for anything wrong in the new protocol; when using > consumer groups becomes default and the migration is near finished, we will > set the default policy to UPGRADE to prevent unwanted downgrade causing too > frequent migration. DOWNGRADE could be useful for revert or debug purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on PR #15584: URL: https://github.com/apache/kafka/pull/15584#issuecomment-2018746846 > I don't think it's a good idea to introduce the new terms mangling & unmangling when there are already equivalent terms in the codebase – sanitizing/desanitizing – it makes it unnecessarily confusing. That's a fair point. I will remove the references to "mangling" and replace them with "sanitization" (although I don't really agree, I think "sanitization" implies discarding bad data, not mangling it) But let's not change ZK terminology at this point :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16294) Add group protocol migration enabling config
[ https://issues.apache.org/jira/browse/KAFKA-16294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu updated KAFKA-16294: Description: The online upgrade is triggered when a consumer group heartbeat request is received in a classic group. The downgrade is triggered when any old protocol request is received in a consumer group. We only accept upgrade/downgrade if the corresponding group migration config policy is enabled. This is the first part of the implementation of online group protocol migration, adding the kafka config group protocol migration. The config has four valid values – both(both upgrade and downgrade are allowed), upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is allowed.). At present the default value is NONE. When we start enabling the migration, we expect to set BOTH to default so that it's easier to roll back to the old protocol as a quick fix for anything wrong in the new protocol; when using consumer groups becomes default and the migration is near finished, we will set the default policy to upgrade to prevent unwanted downgrade causing too frequent migration. was: The offline upgrade is triggered when a consumer group heartbeat request is received in a classic group. The downgrade is triggered when any old protocol request is received in a consumer group. We only accept upgrade/downgrade if 1) the group migration config is enabled 2) the group is empty. This is the first part of the implementation of offline group protocol migration, adding the kafka config group protocol migration. The config has four valid values – both(both upgrade and downgrade are allowed), upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and none(neither is allowed.). > Add group protocol migration enabling config > > > Key: KAFKA-16294 > URL: https://issues.apache.org/jira/browse/KAFKA-16294 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > > The online upgrade is triggered when a consumer group heartbeat request is > received in a classic group. The downgrade is triggered when any old protocol > request is received in a consumer group. We only accept upgrade/downgrade if > the corresponding group migration config policy is enabled. > This is the first part of the implementation of online group protocol > migration, adding the kafka config group protocol migration. The config has > four valid values – both(both upgrade and downgrade are allowed), > upgrade(only upgrade is allowed), downgrade(only downgrade is allowed) and > none(neither is allowed.). > At present the default value is NONE. When we start enabling the migration, > we expect to set BOTH to default so that it's easier to roll back to the old > protocol as a quick fix for anything wrong in the new protocol; when using > consumer groups becomes default and the migration is near finished, we will > set the default policy to upgrade to prevent unwanted downgrade causing too > frequent migration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16084: Simplify and deduplicate standalone herder test mocking [kafka]
ahmedsobeh commented on code in PR #15389: URL: https://github.com/apache/kafka/pull/15389#discussion_r1538122992 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -726,21 +683,21 @@ public void testAccessors() throws Exception { doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); // Create connector -connector = mock(BogusSourceConnector.class); +Connector connector = mock(BogusSourceConnector.class); Review Comment: Thanks for the walkthrough! I now understand the full picture. I think I addressed all your comments now, let me know if I missed anything -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on code in PR #15584: URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580 ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -50,44 +51,54 @@ class ZkConfigMigrationClient( val adminZkClient = new AdminZkClient(zkClient) - /** - * In ZK, we use the special string "default" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "default" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { -if (entityName.equals(ConfigEntityName.DEFAULT)) { + private def fromZkConfigfEntityName(entityName: String): String = { +if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) { "" } else { entityName } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { - ConfigEntityName.DEFAULT + ZooKeeperInternals.DEFAULT_STRING } else { entityName } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { -new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( +entityType: String, +znodeName: String + ): EntityData = { +val result = new EntityData().setEntityType(entityType) +if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) +} else { + // ZNode names are mangled before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the mangling here. + result.setEntityName(Sanitizer.desanitize(znodeName)) +} +result } - override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { - adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => -val entity = List(buildEntityData(entityType, name)).asJava + adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) => +val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) + visitor.visitScramCredential(Sanitizer.desanitize(znodeName), mechanism, scramCredentials) Review Comment: That is correct. The previous change to `ZkMigrationClient` from KAFKA-16222 has been removed in this PR, as you can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on code in PR #15584: URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580 ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -50,44 +51,54 @@ class ZkConfigMigrationClient( val adminZkClient = new AdminZkClient(zkClient) - /** - * In ZK, we use the special string "default" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "default" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { -if (entityName.equals(ConfigEntityName.DEFAULT)) { + private def fromZkConfigfEntityName(entityName: String): String = { +if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) { "" } else { entityName } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { - ConfigEntityName.DEFAULT + ZooKeeperInternals.DEFAULT_STRING } else { entityName } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { -new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( +entityType: String, +znodeName: String + ): EntityData = { +val result = new EntityData().setEntityType(entityType) +if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) +} else { + // ZNode names are mangled before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the mangling here. + result.setEntityName(Sanitizer.desanitize(znodeName)) +} +result } - override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { - adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => -val entity = List(buildEntityData(entityType, name)).asJava + adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) => +val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) + visitor.visitScramCredential(Sanitizer.desanitize(znodeName), mechanism, scramCredentials) Review Comment: That is correct. The previous change from KAFKA-16222 has been removed in this PR, as you can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
jeffkbkim commented on PR #15446: URL: https://github.com/apache/kafka/pull/15446#issuecomment-2018694290 thanks @dajac. I have addressed your comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16200) Enforce that RequestManager implementations respect user-provided timeout
[ https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16200: - Assignee: Kirk True (was: Bruno Cadonna) > Enforce that RequestManager implementations respect user-provided timeout > - > > Key: KAFKA-16200 > URL: https://issues.apache.org/jira/browse/KAFKA-16200 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. > Enforce at the request manager layer that timeouts are respected per the > design in KAFKA-15848. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16420: Add instance methods to Exit to replace static methods [kafka]
gharris1727 opened a new pull request, #15598: URL: https://github.com/apache/kafka/pull/15598 This adds a thread-safe alternative to the current Exit implementation, which doesn't make use of static mutable fields. This solves a number of problems: 1. Thread isolation: Two tests can inject exit procedures concurrently, without conflicting with one-another 2. The test-only methods for manipulating the state of Exit are present on the main class, and so could be called in a non-test environment. 3. When fatal procedures are intercepted by Exit, shutdown hooks are delayed until the real JVM exit, possibly leaking resources/files across tests. 4. When fatal procedures are intercepted by Exit, those exceptions can go un-noticed, and may not cause the test to fail. 5. When shutdown hooks are intercepted by Exit, they are dropped and never executed 6. It isn't clear to code calling exit() that an exception might be thrown instead when the procedures are mocked. The Java and Scala Exit classes don't use any instance methods, so I was able to directly add instance methods to the existing classes. This should prevent the need for a large rename later, such as appears necessary with the other options I could think of: * Add it as a different class SafeExit: large amount of code churn when Exit is removed, silly name * Moving "Exit" to "UnsafeExit" now: large amount of code churn up-front, immediate merge conflicts * Add it to a different package as "utils.temp.Exit": moderate amount of code churn when Exit is removed, harder to review Because static and instance methods cannot share signatures, I had to perturb the names slightly. I think the new names are acceptable, and shouldn't cause any additional code churn. * exit to exitOrThrow * halt to haltOrThrow * addShutdownHook to addShutdownRunnable In the interest of not testing tests, I've omitted tests for these changes. The existing ExitTest is testing the mocking methods, which are not part of the main API now, just the MockExit API. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1538043755 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ## @@ -236,6 +252,91 @@ private List topicPartitionsForStore(final StateStore store) { } return topicPartitions; } +@SuppressWarnings("unchecked") +private void reprocessState(final List topicPartitions, +final Map highWatermarks, +final InternalTopologyBuilder.ReprocessFactory reprocessFactory, +final String storeName) { +final Processor source = reprocessFactory.processorSupplier().get(); +source.init(globalProcessorContext); + +for (final TopicPartition topicPartition : topicPartitions) { +long currentDeadline = NO_DEADLINE; + +globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; +final Long checkpoint = checkpointFileCache.get(topicPartition); +if (checkpoint != null) { +globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; +} else { + globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +offset = getGlobalConsumerOffset(topicPartition); +} +final Long highWatermark = highWatermarks.get(topicPartition); +stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); + +long restoreCount = 0L; + +while (offset < highWatermark) { +// we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short +// to give a fetch request a fair chance to actually complete and we don't want to +// start `task.timeout.ms` too early +// +// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call +// `poll(pollMS)` without adding the request timeout and do a more precise +// timeout handling +final ConsumerRecords records = globalConsumer.poll(pollMsPlusRequestTimeout); +if (records.isEmpty()) { +currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline); +} else { +currentDeadline = NO_DEADLINE; +} + +for (final ConsumerRecord record : records.records(topicPartition)) { +final ProcessorRecordContext recordContext = +new ProcessorRecordContext( +record.timestamp(), +record.offset(), +record.partition(), +record.topic(), +record.headers()); +globalProcessorContext.setRecordContext(recordContext); + +try { +if (record.key() != null) { +source.process(new Record<>( + reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()), + reprocessFactory.valueDeserializer().deserialize(record.topic(), record.value()), +record.timestamp(), +record.headers())); +restoreCount++; +} +} catch (final Exception deserializationException) { +handleDeserializationFailure( Review Comment: If you still think we need to refactor RecordDeserializer I think we should do that in a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16160: -- Priority: Blocker (was: Major) > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Priority: Critical (was: Blocker) > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15950) Serialize broker heartbeat requests
[ https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15950. - Fix Version/s: 3.8.0 Resolution: Fixed merged the PR to trunk. > Serialize broker heartbeat requests > --- > > Key: KAFKA-15950 > URL: https://issues.apache.org/jira/browse/KAFKA-15950 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Jun Rao >Assignee: Igor Soarez >Priority: Major > Fix For: 3.8.0 > > > This is a followup issue from the discussion in > [https://github.com/apache/kafka/pull/14836#discussion_r1409739363]. > {{KafkaEventQueue}} does de-duping and only allows one outstanding > {{CommunicationEvent}} in the queue. But it seems that duplicated > {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} > calls {{sendBrokerHeartbeat}} that calls the following. > {code:java} > _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), > handler){code} > The problem is that we have another queue in > {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a > {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a > {{HeartbeatRequest}} will be queued in > {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another > {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When > it's processed, another {{HeartbeatRequest}} will be queued in > {{{}NodeToControllerChannelManagerImpl{}}}. > This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in > practice since {{CommunicationEvent}} is typically queued in > {{KafkaEventQueue}} for heartbeat interval. By that time, other pending > {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to > {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to > reason about tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao merged PR #14903: URL: https://github.com/apache/kafka/pull/14903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Priority: Blocker (was: Major) > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018530584 @soarez : Thanks for triaging the test failures. Will merge the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16271: - Assignee: Philip Nee > Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol > config > -- > > Key: KAFKA-16271 > URL: https://issues.apache.org/jira/browse/KAFKA-16271 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The tricky wrinkle here is that the existing test relies on client-side > assignment strategies that aren't applicable with the new KIP-848-enabled > consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018506864 @junrao all the failed tests are being tracked. These were already tracked: * KAFKA-8041 kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft * KAFKA-8115 org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() * KAFKA-15772 org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testAbortTransactionTimeout(String).quorum=kraft * KAFKA-15897 kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId() * KAFKA-15898 org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers() * KAFKA-15921 kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=kraft * KAFKA-15927 org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault() * KAFKA-15927 org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault() * KAFKA-15928 org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() * KAFKA-15945 org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs() * KAFKA-15961 kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart() * KAFKA-16225 kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft * KAFKA-16323 kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric() This one was not, so I created a JIRA: * KAFKA-16422 org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16422) Flaky test org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"
Igor Soarez created KAFKA-16422: --- Summary: Flaky test org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true" Key: KAFKA-16422 URL: https://issues.apache.org/jira/browse/KAFKA-16422 Project: Kafka Issue Type: Bug Reporter: Igor Soarez {code:java} [2024-03-22T10:39:59.911Z] Gradle Test Run :metadata:test > Gradle Test Executor 92 > QuorumControllerMetricsIntegrationTest > testFailingOverIncrementsNewActiveControllerCount(boolean) > "testFailingOverIncrementsNewActiveControllerCount(boolean).true" FAILED [2024-03-22T10:39:59.912Z] org.opentest4j.AssertionFailedError: expected: <1> but was: <2> [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:632) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.lambda$testFailingOverIncrementsNewActiveControllerCount$1(QuorumControllerMetricsIntegrationTest.java:107) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:412) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testFailingOverIncrementsNewActiveControllerCount(QuorumControllerMetricsIntegrationTest.java:105) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14246) Update threading model for Consumer
[ https://issues.apache.org/jira/browse/KAFKA-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-14246. --- Resolution: Fixed > Update threading model for Consumer > --- > > Key: KAFKA-14246 > URL: https://issues.apache.org/jira/browse/KAFKA-14246 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Hi community, > > We are refactoring the current KafkaConsumer and making it more asynchronous. > This is the master Jira to track the project's progress; subtasks will be > linked to this ticket. Please review the design document and feel free to > use this thread for discussion. > > The design document is here: > [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor] > > The original email thread is here: > [https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l] > > I will continue to update the 1pager as reviews and comments come. > > Thanks, > P -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16420) Replace utils.Exit with a thread-safe alternative
[ https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830603#comment-17830603 ] Greg Harris commented on KAFKA-16420: - I can't find a single ticket for "Remove Zookeeper", but I think that is also a precondition for implementing this change. I found a number of code-paths which are ZK specific that would make this migration significantly more involved, so I think it is best that those are removed first. > Replace utils.Exit with a thread-safe alternative > - > > Key: KAFKA-16420 > URL: https://issues.apache.org/jira/browse/KAFKA-16420 > Project: Kafka > Issue Type: Wish > Components: connect, core, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > The Exit class is not thread-safe, and exposes our tests to race conditions > and inconsistent execution. It is not possible to make it thread-safe due to > the static design of the API. > We should add an alternative to the Exit class, and migrate the existing > usages to the replacement, before finally removing the legacy Exit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16421) Refactor CommandDefaultOptions subclasses to throw exceptions instead of calling exit.
Greg Harris created KAFKA-16421: --- Summary: Refactor CommandDefaultOptions subclasses to throw exceptions instead of calling exit. Key: KAFKA-16421 URL: https://issues.apache.org/jira/browse/KAFKA-16421 Project: Kafka Issue Type: Wish Components: tools Reporter: Greg Harris Many command-line utilities use the "mainNoExit()" idiom to provide a testable entrypoint to the command-line utility that doesn't include calling System.exit. This allows tests to safely exercise the command-line utility end-to-end, without risk that the JVM will stop. Often, command implementations themselves adhere to this idiom, and don't call Exit. However, this is compromised by the CommandLineUtils functions, which call Exit.exit when an error is encountered while parsing the command-line arguments. These utilities are pervasively used in subclasses of CommandDefaultOptions, across hundreds of call-sites. We should figure out a way to replace this exit behavior with exceptions that are eventually propagated from the *Options constructors. This will allow the command-line implementations to handle these errors, and return the appropriate exit code from mainNoExit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition
[ https://issues.apache.org/jira/browse/KAFKA-16349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830590#comment-17830590 ] Greg Harris commented on KAFKA-16349: - I added a tactical fix for the Exit class in my PR to resolve this bug. I'll pursue this refactor in a separate ticket KAFKA-16420. > ShutdownableThread fails build by calling Exit with race condition > -- > > Key: KAFKA-16349 > URL: https://issues.apache.org/jira/browse/KAFKA-16349 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Minor > > `ShutdownableThread` calls `Exit.exit()` when the thread's operation throws > FatalExitError. In normal operation, this calls System.exit, and exits the > process. In tests, the exit procedure is masked with Exit.setExitProcedure to > prevent tests that encounter a FatalExitError from crashing the test JVM. > Masking of exit procedures is usually done in BeforeEach/AfterEach > annotations, with the exit procedures cleaned up immediately after the test > finishes. If the body of the test creates a ShutdownableThread that outlives > the test, such as by omitting `ShutdownableThread#awaitShutdown`, by having > `ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a > race condition between `Exit.resetExitProcedure` and `Exit.exit`, then > System.exit() can be called erroneously. > > {noformat} > // First, in the test thread: > Exit.setExitProcedure(...) > try { > new ShutdownableThread(...).start() > } finally { > Exit.resetExitProcedure() > } > // Second, in the ShutdownableThread: > try { > throw new FatalExitError(...) > } catch (FatalExitError e) { > Exit.exit(...) // Calls real System.exit() > }{noformat} > > This can be resolved by one of the following: > # Eliminate FatalExitError usages in code when setExitProcedure is in-use > # Eliminate the Exit.exit call from ShutdownableThread, and instead > propagate this error to another thread to handle without a race-condition > # Eliminate resetExitProcedure by refactoring Exit to be non-static > FatalExitError is in use in a small number of places, but may be difficult to > eliminate: > * FinalizedFeatureChangeListener > * InterBrokerSendThread > * TopicBasedRemoteLogMetadataManager > There are many other places where Exit is called from a background thread, > including some implementations of ShutdownableThread which don't use > FatalExitError. > The effect of this bug is that the build is flaky, as race > conditions/timeouts in tests can cause the gradle executors to exit with > status code 1, which has happened 26 times in the last 28 days. I have not > yet been able to confirm this bug is happening in other tests, but I do have > a deterministic reproduction case with the exact same symptoms: > {noformat} > Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest > > testShutdownWhenTestTimesOut(boolean) > > "testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':core:test'. > > Process 'Gradle Test Executor 38' finished with non-zero exit value 1 > This problem might be caused by incorrect test process configuration. > For more on test execution, please refer to > https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in > the Gradle documentation.{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]
OmniaGM commented on PR #15575: URL: https://github.com/apache/kafka/pull/15575#issuecomment-2018444788 > Hello @AndrewJSchofield @OmniaGM > > Looks like dependency on server tests not required but introduced in this PR. I have plans to double check this after minor comment in #15569 will be resolved. updated the pr now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16420) Replace utils.Exit with a thread-safe alternative
Greg Harris created KAFKA-16420: --- Summary: Replace utils.Exit with a thread-safe alternative Key: KAFKA-16420 URL: https://issues.apache.org/jira/browse/KAFKA-16420 Project: Kafka Issue Type: Wish Components: connect, core, tools Reporter: Greg Harris Assignee: Greg Harris The Exit class is not thread-safe, and exposes our tests to race conditions and inconsistent execution. It is not possible to make it thread-safe due to the static design of the API. We should add an alternative to the Exit class, and migrate the existing usages to the replacement, before finally removing the legacy Exit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2018411363 Getting a fresh build before merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16398) mirror-maker2 running into OOM while filtering (dropping) high number of messages
[ https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830576#comment-17830576 ] Greg Harris commented on KAFKA-16398: - Hi [~srivignesh] Thank you for providing your configuration. Please note that this part of the configuration: {noformat} "transforms": "Filter", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",{noformat} will drop every record unconditionally, so you won't get any of the load/memory pressure that the producers provide. This would explain why adding the predicate (which then allows some records through) would cause OOMs. On that note, you have a lot of tuning parameters in your connect-distributed config, some of which don't appear to have any effect. One that does appear to have an effect is `producer.buffer.memory=8388608`. For 1500 tasks with that config, I would expect just the producer buffers to consume 12GiB, without taking into account any other buffers/memory overhead. If all of the tasks were started on a single node, this could very easily cause OOMs. What size cluster are you using, and does the cluster ever shrink to a single node, such as with a full reboot/cold start? If so, you may want to reduce the number of tasks, or size of each task's producer buffer. > mirror-maker2 running into OOM while filtering (dropping) high number of > messages > - > > Key: KAFKA-16398 > URL: https://issues.apache.org/jira/browse/KAFKA-16398 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.6.1 >Reporter: Srivignesh >Priority: Critical > Attachments: connect-distributed.properties.template, > mm2.config.template > > > Based on custom predicate, our application is filtering messages during > mirroring. > When the HasHeader:test method of the predicate returns true (when it has to > drop messages from mirroring), it encounters below exceptions. > However when it returns false (the messages are forwarded for mirroring), it > works fine without OOM. > Note: This issue doesn't occur with the same load in version 2.8.0. > JVM heap size increased till 15G, but still OOM hits. > Exception stacktraces: > {code:java} > line java.lang.OutOfMemoryError: Java heap space > line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289) > line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252) > line at > org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270) > line at > org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) > line at > org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) > line at > org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) > line at > org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) > line at > org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > line at > org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357) > line at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) > line at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) > line at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) > line at > org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x7f55cc4c3d78.run(Unknown > Source) > line at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > line at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > line at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > line at java.base/java.lang.Thread.run(Thread.java:840) {code} > {code:java} > line java.lang.OutOfMemoryError: Java heap space line at >
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018408733 @soarez : Are the failed tests in the latest run being tracked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]
FrankYang0529 commented on PR #15505: URL: https://github.com/apache/kafka/pull/15505#issuecomment-2018399116 > @FrankYang0529 , there are some tests failing because of your change. Please take a look. Thanks. > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15505/4 Sorry for that. I will double check the test result next time. I have fixed error in `testScramWithBadMetadataVersion`, `testMetadataVersionChangeExceptionToString`, and `testCreateClusterInvalidMetadataVersion`. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 opened a new pull request, #15597: URL: https://github.com/apache/kafka/pull/15597 Add Utils.castToStringObjectMap and fix unchecked casts in AbstractConfig ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process
[ https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu updated KAFKA-16419: --- Description: Currently in the [LogValidator.validateMessagesAndAssignOffsetsCompressed|https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315], there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into 5 steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type was: Currently in the [LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315), there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into 5 steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type > Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply > the process > - > > Key: KAFKA-16419 > URL: https://issues.apache.org/jira/browse/KAFKA-16419 > Project: Kafka > Issue Type: Improvement >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > > Currently in the > [LogValidator.validateMessagesAndAssignOffsetsCompressed|https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315], > there are lots of if-else checks based on the `magic` and `CompressionType`, > which makes the code complicated and increase the difficulties of > maintaining. > The flow of the validation can be separated into 5 steps: > # IBP validation > ## whether the compression type is valid for this IBP > # In-place assignment enablement check > ## based on the magic value and compression type, decide whether we can do > in-place assignment > # batch level validation > ## based on the batch origin (client, controller, etc) and magic version > # record level validation > ## based on whether we can do in-place assignment, choose different iterator > ## based on the magic and compression type, do different validation > # return validated results > ## based on whether we can do in-place assignment, build the records or > assign it > This whole flow can be extracted into an interface, and the > LogValidator.validateMessagesAndAssignOffsetsCompressed can init an > implementation based on the passed-in records. > The implementation class will have the following fields: > # magic value > # source compression type > # target compression type > # origin > # records > # timestamp type -- This message was sent by Atlassian Jira
[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process
[ https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu updated KAFKA-16419: --- Description: Currently in the [LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315), there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into 5 steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type was: Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into 5 steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type > Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply > the process > - > > Key: KAFKA-16419 > URL: https://issues.apache.org/jira/browse/KAFKA-16419 > Project: Kafka > Issue Type: Improvement >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > > Currently in the > [LogValidator.validateMessagesAndAssignOffsetsCompressed|http://example.com](https://github.com/apache/kafka/blob/51c9b0d0ad408754b1c5883a9c7fcc63a5f57eb8/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L315), > there are lots of if-else checks based on the `magic` and `CompressionType`, > which makes the code complicated and increase the difficulties of > maintaining. > The flow of the validation can be separated into 5 steps: > # IBP validation > ## whether the compression type is valid for this IBP > # In-place assignment enablement check > ## based on the magic value and compression type, decide whether we can do > in-place assignment > # batch level validation > ## based on the batch origin (client, controller, etc) and magic version > # record level validation > ## based on whether we can do in-place assignment, choose different iterator > ## based on the magic and compression type, do different validation > # return validated results > ## based on whether we can do in-place assignment, build the records or > assign it > This whole flow can be extracted into an interface, and the > LogValidator.validateMessagesAndAssignOffsetsCompressed can init an > implementation based on the passed-in records. > The implementation class will have the following fields: > # magic value > # source compression type > # target compression type > # origin > # records > # timestamp type -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16409: DeleteRecordsCommand should use standard exception handling [kafka]
FrankYang0529 commented on PR #15586: URL: https://github.com/apache/kafka/pull/15586#issuecomment-2018334375 > Failed with the same error again. Please help fix the errors. Thanks. Sorry, I will double check test result before creating a PR next time. I have updated the case. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process
[ https://issues.apache.org/jira/browse/KAFKA-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu updated KAFKA-16419: --- Description: Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into 5 steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type was: Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into x steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type > Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply > the process > - > > Key: KAFKA-16419 > URL: https://issues.apache.org/jira/browse/KAFKA-16419 > Project: Kafka > Issue Type: Improvement >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > > Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, > there are lots of if-else checks based on the `magic` and `CompressionType`, > which makes the code complicated and increase the difficulties of > maintaining. > The flow of the validation can be separated into 5 steps: > # IBP validation > ## whether the compression type is valid for this IBP > # In-place assignment enablement check > ## based on the magic value and compression type, decide whether we can do > in-place assignment > # batch level validation > ## based on the batch origin (client, controller, etc) and magic version > # record level validation > ## based on whether we can do in-place assignment, choose different iterator > ## based on the magic and compression type, do different validation > # return validated results > ## based on whether we can do in-place assignment, build the records or > assign it > This whole flow can be extracted into an interface, and the > LogValidator.validateMessagesAndAssignOffsetsCompressed can init an > implementation based on the passed-in records. > The implementation class will have the following fields: > # magic value > # source compression type > # target compression type > # origin > # records > # timestamp type -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16419) Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process
Johnny Hsu created KAFKA-16419: -- Summary: Abstract validateMessagesAndAssignOffsetsCompressed of LogValidator to simply the process Key: KAFKA-16419 URL: https://issues.apache.org/jira/browse/KAFKA-16419 Project: Kafka Issue Type: Improvement Reporter: Johnny Hsu Assignee: Johnny Hsu Currently in the LogValidator.validateMessagesAndAssignOffsetsCompressed, there are lots of if-else checks based on the `magic` and `CompressionType`, which makes the code complicated and increase the difficulties of maintaining. The flow of the validation can be separated into x steps: # IBP validation ## whether the compression type is valid for this IBP # In-place assignment enablement check ## based on the magic value and compression type, decide whether we can do in-place assignment # batch level validation ## based on the batch origin (client, controller, etc) and magic version # record level validation ## based on whether we can do in-place assignment, choose different iterator ## based on the magic and compression type, do different validation # return validated results ## based on whether we can do in-place assignment, build the records or assign it This whole flow can be extracted into an interface, and the LogValidator.validateMessagesAndAssignOffsetsCompressed can init an implementation based on the passed-in records. The implementation class will have the following fields: # magic value # source compression type # target compression type # origin # records # timestamp type -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1537804695 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -411,8 +411,8 @@ public int memberEpoch() { public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) { if (response.errorCode() != Errors.NONE.code()) { String errorMessage = String.format( -"Unexpected error in Heartbeat response. Expected no error, but received: %s", Review Comment: good call. I think it was editor's auto correction. Reverting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16318) Add javadoc to KafkaMetric
[ https://issues.apache.org/jira/browse/KAFKA-16318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830539#comment-17830539 ] Johnny Hsu commented on KAFKA-16318: the PR is merged, close this ticket > Add javadoc to KafkaMetric > -- > > Key: KAFKA-16318 > URL: https://issues.apache.org/jira/browse/KAFKA-16318 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: Mickael Maison >Assignee: Johnny Hsu >Priority: Major > Fix For: 3.8.0 > > > KafkaMetric is part of the public API but it's missing javadoc describing the > class and several of its methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: optimize EvictableKey/LastUsedKey compareTo function [kafka]
chia7712 closed pull request #9725: MINOR: optimize EvictableKey/LastUsedKey compareTo function URL: https://github.com/apache/kafka/pull/9725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add comments to explain why it needs to add synchronization on… [kafka]
chia7712 closed pull request #9517: MINOR: add comments to explain why it needs to add synchronization on… URL: https://github.com/apache/kafka/pull/9517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not… [kafka]
chia7712 closed pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not… URL: https://github.com/apache/kafka/pull/8978 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10498 Consumer should do offset/epoch validation through when … [kafka]
chia7712 closed pull request #9796: KAFKA-10498 Consumer should do offset/epoch validation through when … URL: https://github.com/apache/kafka/pull/9796 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: apply FilterByKeyIterator and FlattenedIterator to code base [kafka]
chia7712 closed pull request #9954: MINOR: apply FilterByKeyIterator and FlattenedIterator to code base URL: https://github.com/apache/kafka/pull/9954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix the soft link created by ducktape when running system test… [kafka]
chia7712 closed pull request #9798: MINOR: fix the soft link created by ducktape when running system test… URL: https://github.com/apache/kafka/pull/9798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Convert connect assignment schemas to use generated protocol [kafka]
chia7712 closed pull request #9641: MINOR: Convert connect assignment schemas to use generated protocol URL: https://github.com/apache/kafka/pull/9641 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12410 KafkaAPis ought to group fetch data before generating fet… [kafka]
chia7712 closed pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet… URL: https://github.com/apache/kafka/pull/10269 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: KRPC supports to get true type from entity type [kafka]
chia7712 closed pull request #10283: MINOR: KRPC supports to get true type from entity type URL: https://github.com/apache/kafka/pull/10283 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: using INFO level to log 'no meta.properties' for broker server [kafka]
chia7712 closed pull request #10261: MINOR: using INFO level to log 'no meta.properties' for broker server URL: https://github.com/apache/kafka/pull/10261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: main function of o.a.k.c.p.t.Type does not show all types [kafka]
chia7712 closed pull request #10248: MINOR: main function of o.a.k.c.p.t.Type does not show all types URL: https://github.com/apache/kafka/pull/10248 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12309 The revocation algorithm produces uneven distributions [kafka]
chia7712 closed pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions URL: https://github.com/apache/kafka/pull/10077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: check duplicate advertised listeners based on resolved host [kafka]
chia7712 closed pull request #10577: MINOR: check duplicate advertised listeners based on resolved host URL: https://github.com/apache/kafka/pull/10577 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove partition-level error from MetadataResponse#errorCounts [kafka]
chia7712 closed pull request #11128: MINOR: remove partition-level error from MetadataResponse#errorCounts URL: https://github.com/apache/kafka/pull/11128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: don't record the throttled rate when there is no throttled par… [kafka]
chia7712 closed pull request #12528: MINOR: don't record the throttled rate when there is no throttled par… URL: https://github.com/apache/kafka/pull/12528 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13874 Avoid synchronization in SocketServer metrics [kafka]
chia7712 closed pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics URL: https://github.com/apache/kafka/pull/13285 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: don't disconnect stale controller if the network client is res… [kafka]
chia7712 closed pull request #13395: MINOR: don't disconnect stale controller if the network client is res… URL: https://github.com/apache/kafka/pull/13395 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14811 The forwarding requests are discarded when network client… [kafka]
chia7712 closed pull request #13405: KAFKA-14811 The forwarding requests are discarded when network client… URL: https://github.com/apache/kafka/pull/13405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16224: Do not retry committing if topic or partition deleted [kafka]
cadonna commented on PR #15581: URL: https://github.com/apache/kafka/pull/15581#issuecomment-2018163251 Sorry for that! I do not know why I am so sloppy with this PR. I am going to fix this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16416: Use NetworkClientTest to replace RequestResponseTest to be the example of log4j output [kafka]
KevinZTW commented on code in PR #15596: URL: https://github.com/apache/kafka/pull/15596#discussion_r1537700914 ## README.md: ## @@ -56,7 +56,11 @@ Follow instructions in https://kafka.apache.org/quickstart ### Running a particular unit/integration test with log4j output ### Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties` -./gradlew clients:test --tests RequestResponseTest +For example, you can modify the line in `clients/src/test/resources/log4j.properties` to `log4j.logger.org.apache.kafka=INFO` and then run: + +./gradlew cleanTest clients:test --tests NetworkClientTest --info Review Comment: I see, I add the `--info` is because I want the user could also see logs on the console directly agree it do print out lots of info though... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]
lucasbru merged PR #15535: URL: https://github.com/apache/kafka/pull/15535 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]
lucasbru commented on PR #15535: URL: https://github.com/apache/kafka/pull/15535#issuecomment-2018145845 Two related flaky tests, `kafka.api.PlaintextConsumerTest.testExpandingTopicSubscriptions(String, String)[4]` and `org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeExistingGroupWithNoMembers(String, String)[4]`. But doesn't seem to be caused by this PR, as they have flaked on trunk before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
chickenchickenlove commented on code in PR #15573: URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java: ## @@ -119,7 +119,8 @@ public Queue missingSourceTopicExceptions() { return new StreamsException( new MissingSourceTopicException(String.format( "Missing source topics %s for subtopology %d of topology %s", -missingSourceTopics, subtopologyId, topologyName)), +missingSourceTopics, subtopologyId, topologyName), +missingSourceTopics), Review Comment: FYI, i write skeleton code below! Does it make sense to you as well? ```java public class RepartitionTopics { ... // Add new field (private) private final Set missingTopics = new HashSet(); ... public Set topologiesWithMissingInputTopics() { ... } public Queue missingSourceTopicExceptions() { ... } // Add new method (package-private) Set getMissingTopics() { return this.missingTopics; } ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]
chickenchickenlove commented on code in PR #15573: URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java: ## @@ -119,7 +119,8 @@ public Queue missingSourceTopicExceptions() { return new StreamsException( new MissingSourceTopicException(String.format( "Missing source topics %s for subtopology %d of topology %s", -missingSourceTopics, subtopologyId, topologyName)), +missingSourceTopics, subtopologyId, topologyName), +missingSourceTopics), Review Comment: FYI, i write skeleton code below! Does it make sense to you as well? ```java public class RepartitionTopics { ... // Add new field (private) private final Set missingTopics = new HashSet(); ... public Queue missingSourceTopicExceptions() {...} // Add new method (package-private) Set getMissingTopics() { return this.missingTopics; } ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
johnnychhsu commented on PR #15463: URL: https://github.com/apache/kafka/pull/15463#issuecomment-2018127603 updated @showuon it works in my local, thanks for the sharing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org