[jira] [Commented] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code
[ https://issues.apache.org/jira/browse/KAFKA-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816873#comment-17816873 ] David Jacot commented on KAFKA-16244: - [~ijuma] I actually learnt yesterday that we are already doing it: [https://github.com/apache/kafka/pull/15139#discussion_r1451037707.] [~jsancio] Did you check the license when you added support for it? > Move code style exceptions from suppressions.xml to the code > > > Key: KAFKA-16244 > URL: https://issues.apache.org/jira/browse/KAFKA-16244 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15256: URL: https://github.com/apache/kafka/pull/15256#issuecomment-1940499322 @showuon Thank you for review and merging! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
nizhikov commented on PR #15216: URL: https://github.com/apache/kafka/pull/15216#issuecomment-1940488544 Hello @philipnee @lucasbru Why you remove GroupState class in this commit? This class will be used in java version of ConsoleGroupCommand when https://github.com/apache/kafka/pull/14471 will be fully merged Class was merged to trunk in https://github.com/apache/kafka/commit/ff25c350a799f372ce6c56e14bbfe3bc7e088218 to simplify further reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac commented on PR #15150: URL: https://github.com/apache/kafka/pull/15150#issuecomment-1940417533 @rreddy-22 There are conflicts due to changes made in https://github.com/apache/kafka/commit/88c5543ccfb517ceba8dc56d22576efdf2540e0e. Could you take a look when you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] POC: run group coordinator state machine on request handler pool [kafka]
github-actions[bot] commented on PR #14728: URL: https://github.com/apache/kafka/pull/14728#issuecomment-1940369610 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
github-actions[bot] commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1940369577 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
showuon merged PR #15256: URL: https://github.com/apache/kafka/pull/15256 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
showuon commented on PR #15256: URL: https://github.com/apache/kafka/pull/15256#issuecomment-1940345746 > Hello @showuon > > I found that `DeleteTopicTest.scala` don't close resources allocated in case of kraft run. It was hided by removed tests and lead to the tests failures we observed earlier. > > I fixed `DeleteTopicTest` and now it seems that CI is OK. > > Take a look one more time, please. Nice find! Thanks for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on PR #15316: URL: https://github.com/apache/kafka/pull/15316#issuecomment-1940100133 Final batch of the `WorkerSinkTaskTest` migration 🎉 🎉 🎉 This one is probably the largest of the 3 (sorry). We can either do one final commit renaming the test from `WorkerSinkTaskTestMockito` back to `WorkerSinkTastTest`, or just open a new PR once this gets merged to do the cleanup Tagging @divijvaidya @gharris1727 @C0urante -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]
CalvinConfluent commented on PR #15359: URL: https://github.com/apache/kafka/pull/15359#issuecomment-1939789669 @splett2 @jolshan @artemlivshits @cmccabe With the discussion the the previous PR, I changed the logic to enforce to include all the target replicas in the ISR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on PR #14604: URL: https://github.com/apache/kafka/pull/14604#issuecomment-1939785641 Opened a separate PR https://github.com/apache/kafka/pull/15359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15665) Enforce ISR to have all target replicas when complete partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-15665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-15665: --- Summary: Enforce ISR to have all target replicas when complete partition reassignment (was: Enforce min ISR when complete partition reassignment) > Enforce ISR to have all target replicas when complete partition reassignment > > > Key: KAFKA-15665 > URL: https://issues.apache.org/jira/browse/KAFKA-15665 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > Current partition reassignment can be completed when the new ISR is under min > ISR. We should fix this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent closed pull request #14604: KAFKA-15665: Enforce min ISR when complete partition reassignment. URL: https://github.com/apache/kafka/pull/14604 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
ableegoldman commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1486914766 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +private boolean transactionBuffersExceedCapacity = false; + +boolean transactionBuffersExceedCapacity() { +return transactionBuffersExceedCapacity; +} + +boolean transactionBuffersWillExceedCapacity() { +final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; +if (transactionBuffersAreUnbounded) { +return false; +} + +// force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold Review Comment: > Writes in RocksDB are the same size, irrespective of whether they are inserts of new keys, or updates to existing keys. I could be wrong about this, but I thought that writing to an existing key in the active memtable -- ie before it is flushed and/or made immutable -- is indeed applied as an overwrite, and does not grow the write buffer's overall size? Of course this only applies to patterns in which keys are updated multiple times between flushes, sure. Still, I feel like this is one of the major points of the write buffer: to deduplicate same-key writes as much as possible before flushing them, since once they're flushed it has to wait for a compaction to deduplicate further. That said, I don't feel strongly about this, and any potential performance degradation should come to light in the benchmarks. Thanks for explaining your thoughts on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9376) Plugin class loader not found using MM2
[ https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-9376. Fix Version/s: 2.5.0 Resolution: Fixed > Plugin class loader not found using MM2 > --- > > Key: KAFKA-9376 > URL: https://issues.apache.org/jira/browse/KAFKA-9376 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sinóros-Szabó Péter >Priority: Minor > Fix For: 2.5.0 > > > I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader > errors. MM2 seems to be working, but I do not know if all of it components > are working as expected as this is the first time I use MM2. > I run MM2 with the following command: > {code:java} > ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties > {code} > Errors are: > {code:java} > [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,904] INFO ConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > (org.apache.kafka.connect.runtime.ConnectorConfig:347) > [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > > (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) > [2020-01-07 15:06:17,905] INFO TaskConfig values: > task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask > (org.apache.kafka.connect.runtime.TaskConfig:347) > [2020-01-07 15:06:17,905] INFO Instantiated task MirrorHeartbeatConnector-0 > with version 1 of type org.apache.kafka.connect.mirror.MirrorHeartbeatTask > (org.apache.kafka.connect.runtime.Worker:434){code} > After a while, these errors are not logged any more. > Config is: > {code:java} > clusters = eucmain, euwbackup > eucmain.bootstrap.servers = kafka1:9092,kafka2:9092 > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092 > eucmain->euwbackup.enabled = true > eucmain->euwbackup.topics = .* > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__|pricing).* > eucmain->euwbackup.rename.topics = false > rename.topics = false > eucmain->euwbackup.sync.topic.acls.enabled = false > sync.topic.acls.enabled = false{code} > Using OpenJDK 8 or 11, I get the same error. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9376) Plugin class loader not found using MM2
[ https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816808#comment-17816808 ] Greg Harris commented on KAFKA-9376: Closing the loop here, this was a log message which was originally only called on true error conditions, but due to another change (KAFKA-8340) it was then called every time a classpath plugin was used. Since this was a spurious warning, it was changed to TRACE by a minor PR: [https://github.com/apache/kafka/pull/7964] that landed in the next release. I'm closing this issue as the log message is no longer spuriously printed. > Plugin class loader not found using MM2 > --- > > Key: KAFKA-9376 > URL: https://issues.apache.org/jira/browse/KAFKA-9376 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sinóros-Szabó Péter >Priority: Minor > > I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader > errors. MM2 seems to be working, but I do not know if all of it components > are working as expected as this is the first time I use MM2. > I run MM2 with the following command: > {code:java} > ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties > {code} > Errors are: > {code:java} > [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,904] INFO ConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > (org.apache.kafka.connect.runtime.ConnectorConfig:347) > [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > > (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) > [2020-01-07 15:06:17,905] INFO TaskConfig values: > task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask > (org.apache.kafka.connect.runtime.TaskConfig:347) > [2020-01-07 15:06:17,905] INFO Instantiated task MirrorHeartbeatConnector-0 > with version 1 of type org.apache.kafka.connect.mirror.MirrorHeartbeatTask > (org.apache.kafka.connect.runtime.Worker:434){code} > After a while, these errors are not logged any more. > Config is: > {code:java} > clusters = eucmain, euwbackup > eucmain.bootstrap.servers = kafka1:9092,kafka2:9092 > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092 > eucmain->euwbackup.enabled = true > eucmain->euwbackup.topics = .* > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__|pricing).* > eucmain->euwbackup.rename.topics = false > rename.topics = false > eucmain->euwbackup.sync.topic.acls.enabled = false > sync.topic.acls.enabled = false{code} > Using OpenJDK 8 or 11, I get the same error. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9070) Kafka Connect - Get the wrong offset value comes from Kafka Connect after increase the number of offset storage topic partition to 3
[ https://issues.apache.org/jira/browse/KAFKA-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816806#comment-17816806 ] Greg Harris commented on KAFKA-9070: Adding partitions to a topic is generally an unsafe activity, so I'm not surprised that Connect has a problem with it. That said, I think the best workaround for this problem is to use 3.6.0's [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] feature to export and re-import the offsets to forcibly re-partition the topic. 1. Stop all of the connectors 2. Export the connector offsets to a file with KIP-875. 3. Stop the Connect cluster 4. Delete the offsets topic, and recreate it with the increased number of partitions. Or, create the new topic and reconfigure the workers to use it instead. 5. Start the Connect cluster 6. Write the saved offsets back to the cluster via KIP-875 7. Resume all of the connectors. > Kafka Connect - Get the wrong offset value comes from Kafka Connect after > increase the number of offset storage topic partition to 3 > > > Key: KAFKA-9070 > URL: https://issues.apache.org/jira/browse/KAFKA-9070 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.3.0 > Environment: debezium/connect: 0.10 > mysql: 5.6 > kafka: 2.3.0 >Reporter: kaikai.hou >Priority: Major > Labels: connect > > I'm using the *Debezium* project with distributed mode. > *Problem:* > I found a problem : Get the wrong offset value comes from Kafka Connect after > increase the number of offset storage topic partition to 3. > 1. Cluster mode, tow node (container); > 2. Offset storage topic only have 1 partition; > 3. Create 3 mysql connector ; > 4. Change data, all connector have offset recored in partition 0. > 4. *Increase offset storage topic partition number to 3*. > 5. Change data, and some connector store their offset records to partition 1 > or partition 2. > 6.* Restart all connect service, then all connector will read offset records > from partition 0*. > 7. Then, the connector that stored offset records to partition 1 or partition > 2 will get *repeat data*. (the offset records in partition 0 are* too old* > for this connectors) > *Debug* > The Debezium developer checked their code found that the partition handling > is delegated to Kafka Connect. [their > reply|https://issues.jboss.org/browse/DBZ-1551?focusedCommentId=13800286&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13800286] > Then, after they reproduced the problem, they found that [the problem is that > the incorrect offset is returned by Kafka > Connect|https://issues.jboss.org/browse/DBZ-1551?focusedCommentId=13801400&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13801400] > . -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15421: fix network thread leak in testThreadPoolResize [kafka]
jolshan commented on PR #14320: URL: https://github.com/apache/kafka/pull/14320#issuecomment-1939683100 Hmm I see this flaking again -- https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=kafka.server.DynamicBrokerReconfigurationTest&tests.test=testThreadPoolResize() Any ideas? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
jolshan commented on PR #15354: URL: https://github.com/apache/kafka/pull/15354#issuecomment-1939677736 I still see one LogDirFailureTest failure: ``` org.apache.kafka.server.fault.FaultHandlerException: quorumTestHarnessFaultHandler: Error applying topics delta in MetadataDelta up to 70: Log for partition topic-0 is not available on broker 1 ``` Can we confirm it is not because the log directory is missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
jolshan commented on PR #15354: URL: https://github.com/apache/kafka/pull/15354#issuecomment-1939674788 I filed a JIRA for DescribeConsumerGroupTest https://issues.apache.org/jira/browse/KAFKA-16245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-7217) Loading dynamic topic data into kafka connector sink using regex
[ https://issues.apache.org/jira/browse/KAFKA-7217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-7217. Fix Version/s: 1.1.0 Resolution: Fixed > Loading dynamic topic data into kafka connector sink using regex > > > Key: KAFKA-7217 > URL: https://issues.apache.org/jira/browse/KAFKA-7217 > Project: Kafka > Issue Type: Improvement > Components: connect >Affects Versions: 1.1.0 >Reporter: Pratik Gaglani >Priority: Major > Fix For: 1.1.0 > > > The new feature to use regex KAFKA-3074 > in connectors, however it seems that the topic data from the newly added > topics after the connector has been started is not consumed until the > connector is restarted. We have a need to dynamically added new topic and > have connector consume the topic based on regex defined in properties of > connector. How can it be achieved? Ex: regex: topic-.* topic: topic-1, > topic-2 If I introduce new topic topic-3, then how can I make the connector > consume the topic data without restarting it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16245) DescribeConsumerGroupTest failing
Justine Olshan created KAFKA-16245: -- Summary: DescribeConsumerGroupTest failing Key: KAFKA-16245 URL: https://issues.apache.org/jira/browse/KAFKA-16245 Project: Kafka Issue Type: Task Reporter: Justine Olshan The first instances on trunk are in this PR [https://github.com/apache/kafka/pull/15275] And this PR seems to have it failing consistently in the builds when it wasn't failing this consistently before. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-6340) Support of transactions in KafkaConnect
[ https://issues.apache.org/jira/browse/KAFKA-6340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-6340. Fix Version/s: 3.3.0 Resolution: Fixed > Support of transactions in KafkaConnect > --- > > Key: KAFKA-6340 > URL: https://issues.apache.org/jira/browse/KAFKA-6340 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 3.3.0 > > > Now KafkaConnect source connectors commit source offset in periodic task. > Proposed approach is to produce data record batch and the config update > records (related to the batch) within Kafka transaction, that would prevent > publishing duplicate records on connector restart. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-4961) Mirrormaker crash with org.apache.kafka.common.protocol.types.SchemaException
[ https://issues.apache.org/jira/browse/KAFKA-4961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-4961. Resolution: Won't Fix > Mirrormaker crash with org.apache.kafka.common.protocol.types.SchemaException > - > > Key: KAFKA-4961 > URL: https://issues.apache.org/jira/browse/KAFKA-4961 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.1 >Reporter: Di Shang >Priority: Major > Labels: mirror-maker > > We are running a cluster of 3 brokers and using mirrormaker to replicate a > topic to a different 3-broker cluster. Occasionally we find that when the > source cluster is under heavy load with lots of messages coming in, > mirrormaker will crash with SchemaException. > {noformat} > 27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) > Disconnecting from node 5 due to request timeout. > 27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) > Disconnecting from node 7 due to request timeout. > 27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}), > createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id > 76978 due to node 5 being disconnected > 27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch > failed > org.apache.kafka.common.errors.DisconnectException: null > 27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient > onComplete(line:376) Cancelled FETCH request > ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d, > > request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0}, > > body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}), > createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id > 76980 due to node 7 being disconnected
Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]
jolshan commented on PR #15275: URL: https://github.com/apache/kafka/pull/15275#issuecomment-1939663048 Hey folks -- looks like this broke DescribeConsumerGroupTest. Can we take a look? cc: @lucasbru -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-3213) [CONNECT] It looks like we are not backing off properly when reconfiguring tasks
[ https://issues.apache.org/jira/browse/KAFKA-3213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-3213. Fix Version/s: 3.5.0 Resolution: Fixed > [CONNECT] It looks like we are not backing off properly when reconfiguring > tasks > > > Key: KAFKA-3213 > URL: https://issues.apache.org/jira/browse/KAFKA-3213 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Gwen Shapira >Assignee: Liquan Pei >Priority: Major > Fix For: 3.5.0 > > > Looking at logs of attempt to reconfigure connector while leader is > restarting, I see: > {code} > [2016-01-29 20:31:01,799] ERROR IO error forwarding REST request: > (org.apache.kafka.connect.runtime.rest.RestServer) > java.net.ConnectException: Connection refused > [2016-01-29 20:31:01,802] ERROR Request to leader to reconfigure connector > tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error > trying to forward REST request: Connection refused > [2016-01-29 20:31:01,802] ERROR Failed to reconfigure connector's tasks, > retrying after backoff: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error > trying to forward REST request: Connection refused > [2016-01-29 20:31:01,803] DEBUG Sending POST with input > [{"tables":"bar","table.poll.interval.ms":"1000","incrementing.column.name":"id","connection.url":"jdbc:mysql://worker1:3306/testdb?user=root","name":"test-mysql-jdbc","tasks.max":"3","task.class":"io.confluent.connect.jdbc.JdbcSourceTask","poll.interval.ms":"1000","topic.prefix":"test-","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","mode":"incrementing"},{"tables":"foo","table.poll.interval.ms":"1000","incrementing.column.name":"id","connection.url":"jdbc:mysql://worker1:3306/testdb?user=root","name":"test-mysql-jdbc","tasks.max":"3","task.class":"io.confluent.connect.jdbc.JdbcSourceTask","poll.interval.ms":"1000","topic.prefix":"test-","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","mode":"incrementing"}] > to http://worker2:8083/connectors/test-mysql-jdbc/tasks > (org.apache.kafka.connect.runtime.rest.RestServer) > [2016-01-29 20:31:01,803] ERROR IO error forwarding REST request: > (org.apache.kafka.connect.runtime.rest.RestServer) > java.net.ConnectException: Connection refused > [2016-01-29 20:31:01,804] ERROR Request to leader to reconfigure connector > tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error > trying to forward REST request: Connection refused > {code} > Note that it looks like we are retrying every 1ms, while I'd expect a retry > every 250ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13338) kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact
[ https://issues.apache.org/jira/browse/KAFKA-13338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816800#comment-17816800 ] Greg Harris commented on KAFKA-13338: - Apache Kafka does not have 5.x or 6.x releases, so you must be using a fork of Kafka. The log message and your remediation are as-expected; Connect must use compacted internal topics, and enforcement of this was added after-the-fact. The delay between the upgrade and the connectors failing also indicates that the two are unrelated, and compaction shouldn't be able to cause a InvalidRecordException. As this issue is so stale, and there isn't any information to proceed on, I have closed it. Feel free to raise a new issue if this problem reappears. > kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact > -- > > Key: KAFKA-13338 > URL: https://issues.apache.org/jira/browse/KAFKA-13338 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Fátima Galera >Priority: Major > Attachments: image003.png > > > Hi all, > > During kafka upgrade from 5.3.1 to 6.1.1.1 we get below error starting > kafka-connector services > > ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception > in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > 87738-org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' > supplied via the 'offset.storage.topic' property is required to have > 'cleanup.policy=compact' to guarantee consistency and durability of source > connector offsets, but found the topic currently has 'cleanup.policy=delete'. > Continuing would likely result in eventually losing source connector offsets > and problems restarting this Connect cluster in the future. Change the > 'offset.storage.topic' property in the Connect worker configurations to use a > topic with 'cleanup.policy=compact'. > > After that error we added log.cleanup.policy=compact parameter on > /etc/kafka/server.properties and kafka-connector service started but a few > days later all the connectors are down with attached error. > > Could you please let us know how we can resolved the issue? > > Best regards and thanks > Fátima > NGA -Alight > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16229) Slow expiration of Producer IDs leading to high CPU usage
[ https://issues.apache.org/jira/browse/KAFKA-16229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16229. Resolution: Fixed > Slow expiration of Producer IDs leading to high CPU usage > - > > Key: KAFKA-16229 > URL: https://issues.apache.org/jira/browse/KAFKA-16229 > Project: Kafka > Issue Type: Bug >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > Expiration of ProducerIds is implemented with a slow removal of map keys: > ``` > producers.keySet().removeAll(keys); > ``` > Unnecessarily going through all producer ids and then throw all expired keys > to be removed. > This leads to exponential time on worst case when most/all keys need to be > removed: > ``` > Benchmark (numProducerIds) Mode Cnt > Score Error Units > ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 > 9164.043 ± 10647.877 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 > 341561.093 ± 20283.211 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 1 avgt 3 > 44957983.550 ± 9389011.290 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 10 avgt 3 > 5683374164.167 ± 1446242131.466 ns/op > ``` > A simple fix is to use map#remove(key) instead, leading to a more linear > growth: > ``` > Benchmark (numProducerIds) Mode Cnt > Score Error Units > ProducerStateManagerBench.testDeleteExpiringIds 100 avgt 3 > 5779.056 ± 651.389 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 1000 avgt 3 > 61430.530 ± 21875.644 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 1 avgt 3 > 643887.031 ± 600475.302 ns/op > ProducerStateManagerBench.testDeleteExpiringIds 10 avgt 3 > 7741689.539 ± 3218317.079 ns/op > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13338) kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact
[ https://issues.apache.org/jira/browse/KAFKA-13338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-13338. - Resolution: Invalid > kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact > -- > > Key: KAFKA-13338 > URL: https://issues.apache.org/jira/browse/KAFKA-13338 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: Fátima Galera >Priority: Major > Attachments: image003.png > > > Hi all, > > During kafka upgrade from 5.3.1 to 6.1.1.1 we get below error starting > kafka-connector services > > ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception > in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > 87738-org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' > supplied via the 'offset.storage.topic' property is required to have > 'cleanup.policy=compact' to guarantee consistency and durability of source > connector offsets, but found the topic currently has 'cleanup.policy=delete'. > Continuing would likely result in eventually losing source connector offsets > and problems restarting this Connect cluster in the future. Change the > 'offset.storage.topic' property in the Connect worker configurations to use a > topic with 'cleanup.policy=compact'. > > After that error we added log.cleanup.policy=compact parameter on > /etc/kafka/server.properties and kafka-connector service started but a few > days later all the connectors are down with attached error. > > Could you please let us know how we can resolved the issue? > > Best regards and thanks > Fátima > NGA -Alight > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13163) MySQL Sink Connector - JsonConverter - DataException: Unknown schema type: null
[ https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-13163. - Resolution: Invalid > MySQL Sink Connector - JsonConverter - DataException: Unknown schema type: > null > --- > > Key: KAFKA-13163 > URL: https://issues.apache.org/jira/browse/KAFKA-13163 > Project: Kafka > Issue Type: Task > Components: connect >Affects Versions: 2.1.1 > Environment: PreProd >Reporter: Muddam Pullaiah Yadav >Priority: Major > > Please help with the following issue. Really appreciate it! > > We are using Azure HDInsight Kafka cluster > My sink Properties: > > cat mysql-sink-connector > { > "name":"mysql-sink-connector", > "config": > { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", > "poll.interval.ms":"500", > "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", > "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev", > "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", > "connection.user":"grabmod", "connection.password":"#admin", > "auto.create":"true", "auto.evolve":"true", > "value.converter":"org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable":"false", > "key.converter":"org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable":"true" } > } > > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema > type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178) > [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, > groupId=connect-mysql-sink-connector] Sending LeaveGroup request to > coordinator > wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: > 2147482646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16068) Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors
[ https://issues.apache.org/jira/browse/KAFKA-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16068: Labels: newbie++ (was: ) > Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin > scanning errors > --- > > Key: KAFKA-16068 > URL: https://issues.apache.org/jira/browse/KAFKA-16068 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Greg Harris >Priority: Minor > Labels: newbie++ > > The ConnectorValidationIntegrationTest creates test plugins, some with > erroneous behavior. In particular: > > {noformat} > [2023-12-29 10:28:06,548] ERROR Failed to discover Converter in classpath: > Unable to instantiate TestConverterWithPrivateConstructor: Plugin class > default constructor must be public > (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) > [2023-12-29 10:28:06,550] > ERROR Failed to discover Converter in classpath: Unable to instantiate > TestConverterWithConstructorThatThrowsException: Failed to invoke plugin > constructor (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) > java.lang.reflect.InvocationTargetException{noformat} > These plugins should be eliminated from the classpath, so that the errors do > not appear in unrelated tests. Instead, plugins with erroneous behavior > should only be present in the TestPlugins, so that tests can opt-in to > loading them. > There are already plugins with private constructors and > throwing-exceptions-constructors, so they should be able to be re-used. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations
[ https://issues.apache.org/jira/browse/KAFKA-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14919: Labels: newbie (was: ) > MM2 ForwardingAdmin tests should not conflate admin operations > -- > > Key: KAFKA-14919 > URL: https://issues.apache.org/jira/browse/KAFKA-14919 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Greg Harris >Priority: Minor > Labels: newbie > > The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special > implementation of ForwardingAdmin which records admin operations in a static > ConcurrentMap, which is then used to perform assertions. > This has the problem that one variable (allTopics) is used to perform > assertions for multiple different methods (adding topics, adding partitions, > and syncing configs), despite these operations each being tested separately. > This leads to the confusing behavior where each test appears to assert that a > particular operation has taken place, and instead asserts that at least one > of the operations has taken place. This allows a regression or timeout in one > operation to be hidden by the others, making the behavior of the tests much > less predictable. > These tests and/or the metadata store should be changed so that the tests are > isolated from one another, and actually perform the assertions that > correspond to their titles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails
[ https://issues.apache.org/jira/browse/KAFKA-15349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15349: Labels: newbie++ (was: ) > ducker-ak should fail fast when gradlew systemTestLibs fails > > > Key: KAFKA-15349 > URL: https://issues.apache.org/jira/browse/KAFKA-15349 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Greg Harris >Priority: Minor > Labels: newbie++ > > If you introduce a flaw into the gradle build which causes the systemTestLibs > to fail, such as a circular dependency, then the ducker_test function > continues to run tests which are invalid. > Rather than proceeding to run the tests, the script should fail fast and make > the user address the error before continuing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486800347 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: It makes sense to require exclusive access when building the cache, but here we're just accessing the built value. So I don't think the synchronization is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking
[ https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816791#comment-17816791 ] Greg Harris commented on KAFKA-16084: - Hi [~high.lee] and [~ardada2468] Thanks for volunteering! If you see a ticket unassigned, you can assign it to yourself, and then work to open a PR. Also if you see that an issue is assigned but has been inactive, you can ask the current assignee about the issue, and re-assign it if they are unresponsive. > Simplify and deduplicate StandaloneHerderTest mocking > - > > Key: KAFKA-16084 > URL: https://issues.apache.org/jira/browse/KAFKA-16084 > Project: Kafka > Issue Type: Test > Components: connect >Reporter: Greg Harris >Priority: Minor > Labels: newbie++ > > The StandaloneHerderTest has some cruft that can be cleaned up. What i've > found: > * The `connector` field is written in nearly every test, but only read by one > test, and looks to be nearly irrelevant. > * `expectConfigValidation` has two ways of specifying consecutive > validations. 1. The boolean shouldCreateConnector which is true in the first > invocation and false in subsequent invocations. 2. by passing multiple > configurations via varargs. > * The class uses a mix of Mock annotations and mock(Class) invocations > * The test doesn't stop the thread pool created inside the herder and might > leak threads > * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times > throughout the test > * Some waits are 1000 ms and others are 1000 s, and could be pulled out to > constants or a util method -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @jolshan For correction, I was wrong. The newTargetReplicas is the expected target set (previous + adding - removing). The math.min thing mainly prevents the reassignment from stuck because of a high minISR by mistake. (We do not have any sanity checks when setting the minISR config.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @jolshan For correction, I was wrong. The newTargetReplicas is the expected target set. The math.min thing mainly prevents the reassignment from stuck because of a high minISR by mistake. (We do not have any sanity checks when setting the minISR config.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @jolshan The newTargetReplicas is the expected target set. The math.min thing mainly prevents the reassignment from stuck because of a high minISR by mistake. (We do not have any sanity checks when setting the minISR config.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @jolshan You are right. The newTargetReplicas is the expected target set. The math.min thing mainly prevents the reassignment from stuck because of a high minISR by mistake. (We do not have any sanity checks when setting the minISR config.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16167: re-enable PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup [kafka]
kirktrue opened a new pull request, #15358: URL: https://github.com/apache/kafka/pull/15358 This integration test is now passing, presumably based on recent related changes. Re-enabling to ensure it is included in the test suite to catch any regressions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
C0urante commented on code in PR #15349: URL: https://github.com/apache/kafka/pull/15349#discussion_r1486653428 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java: ## @@ -31,6 +32,7 @@ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer, * which interleaves transaction commit messages into the source topic which are not propagated downstream. */ +@Tag("integration") Review Comment: Good call, agree with removing redundant tags and reducing FUD 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]
gharris1727 merged PR #15313: URL: https://github.com/apache/kafka/pull/15313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]
gharris1727 commented on PR #15313: URL: https://github.com/apache/kafka/pull/15313#issuecomment-1939373332 Test failures appear unrelated, and the runtime tests pass locally for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
jolshan commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486647593 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: Hmm ``` newTargetReplicas = new ArrayList<>(replicas.size()); for (int replica : replicas) { if (!removing.contains(replica)) { newTargetReplicas.add(replica); } } ``` It seems like it is just the final state -- current replicas minus the removing ones. Not the adding ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486639919 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @splett2 Thanks for pointing out the inconsistency. I am not aware of a use case if we want to complete the reassignment before all the replicas in the ISR. @jolshan The newTargetReplicas is the "adding" replicas and targetIsr can be under min ISR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486639832 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -113,14 +116,28 @@ Optional nodeById(int id) { return Optional.ofNullable(nodes.get(id)); } -Cluster cluster() { +public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { return clusterInstance; } } +/** + * Get leader-epoch for partition. + * @param tp partition + * @return leader-epoch if known, else return optional.empty() + */ +public Optional leaderEpochFor(TopicPartition tp) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1939299392 Hey all, I realized thanks to @C0urante's review comment that the integration tag is already inherited from parent classes already, so the "added" tags were really no-ops. I've instead removed all of the duplicated integration tags where the class already inherits the tag from a parent class, including existing ones. So when reviewing: * If you see a tag removed, that should be a no-op change, as that test was already an integration test. * If you see a tag added, that was a test which was previously unitTest, and is now integrationTest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on code in PR #15349: URL: https://github.com/apache/kafka/pull/15349#discussion_r1486595231 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java: ## @@ -31,6 +32,7 @@ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer, * which interleaves transaction commit messages into the source topic which are not propagated downstream. */ +@Tag("integration") Review Comment: I changed this to make it consistent in the other direction, i removed the integration tag from all classes which should already inherit it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
jolshan commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1486595168 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: One last question from me -- why do we have math.min(minIsrCount, newTargetReplicas) Is this covering a case where a non "adding" replica is somehow fallen out of isr but is not under min isr? (As an aside from your changes, the existing code is a bit strange -- lines 104 and 105 seem to do nothing.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: Correct, since snapshot is immutable & race condition is not possible, hence removed the code & comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486590491 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon * @param latestLeaderEpoch latest leader's epoch. */ void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) { -if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { +if (latestLeaderEpoch.isPresent() Review Comment: The change is intentional. The optimisation that KIP-951 proposed is that producer-batch should skip backoff period if "new leader" is known. This change corrects the previous logic to update the batch's leader-epoch only if its newer. I added a test in `ProducerBatchTest.java` to reflect that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: Correct, since its immutable, this is not possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486572035 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: @hachikuji Interesting. `Metadata.update()` requires mutual exclusion while updating `cache`, other internal data structures of `Metadata`. So it makes sense to keep the synchronizatiion, what do you think? Moreover, `fetchCache` is called once in `Sender::sendProducerData`, so it's not a bottle neck in the hotpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm opened a new pull request, #15357: URL: https://github.com/apache/kafka/pull/15357 This PR modifies the commit manager for improved retry logic & fixing bugs: - defines high level functions for each of the different types of commit: commitSync, commitAsync, autoCommitSync (used from consumer close), autoCommitAsync (on interval), autoCommitNow (before revocation). - moves retry logic to these caller functions, keeping a common response error handling that propagates errors that each caller functions retry as it needs. Fixes the following issues: - auto-commit before revocation should retry with latest consumed offsets - auto-commit before revocation should only reset the timer once, when the rebalance completes - StaleMemberEpoch error (fatal) is considered retriable only when committing offsets before revocation, where it is retried with backoff if the member has a valid epoch. All other commits will fail fatally on stale epoch. Note that auto commit on the interval (autoCommitAsync) does not have any specific retry logic for the stale epoch, but will effectively retry on the next interval (as it does for any other fatal error) - fix duplicate and noisy logs for auto-commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]
OmniaGM commented on code in PR #15341: URL: https://github.com/apache/kafka/pull/15341#discussion_r1486540561 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java: ## @@ -104,23 +110,14 @@ private Properties effectiveConfigFrom(final Properties initialConfig) { */ @SuppressWarnings("WeakerAccess") public String brokerList() { -final EndPoint endPoint = kafka.advertisedListeners().head(); +final EndPoint endPoint = kafka.config().effectiveAdvertisedListeners().head(); return endPoint.host() + ":" + endPoint.port(); } - -/** - * The ZooKeeper connection string aka `zookeeper.connect`. - */ -@SuppressWarnings("WeakerAccess") -public String zookeeperConnect() { -return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); Review Comment: Can we also remove `ZkSessionTimeoutMsProp` `KafkaEmbedded.effectiveConfigFrom` line 99 as we don't need it anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]
OmniaGM commented on code in PR #15341: URL: https://github.com/apache/kafka/pull/15341#discussion_r1486534681 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java: ## @@ -102,13 +101,14 @@ public EmbeddedKafkaCluster(final int numBrokers, /** * Creates and starts a Kafka cluster. */ -public void start() throws IOException { +public void start() throws Exception { log.debug("Initiating embedded Kafka cluster startup"); -log.debug("Starting a ZooKeeper instance"); -zookeeper = new EmbeddedZookeeper(); -log.debug("ZooKeeper instance is running at {}", zKConnectString()); +final KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setNumControllerNodes(1) Review Comment: Any reason why we don't setup the cluster with combined nodes and number of controllers == number of brokers instead of one controller and one broker? Specially that as far as I can see all the test set number of brokers to 1 anyway. This also can get us to remove `KafkaEmbedded` which might simplify the setup! WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1486527783 ## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ## @@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isNewGroupCoordinatorEnabled()) { cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true")) } + +if(isKRaftTest()) { Review Comment: The test was flaky because as we call `causeLogDirFailure` some times we impact the first `log.dir` which also is `KafkaConfig.metadataLogDir` as we don't have `metadata.log.dir`. So to fix the flakiness we need to explicitly set `metadata.log.dir` to diff log dir than the ones we could potentially fail for the tests. Hope this explains it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
gaurav-narula commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1486519713 ## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ## @@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isNewGroupCoordinatorEnabled()) { cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true")) } + +if(isKRaftTest()) { Review Comment: To add, #15262 made it apparent because prior to it, the check for metadata log dir failure didn't handle relative paths correctly. Refer https://github.com/apache/kafka/pull/15262/files#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1486512687 ## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ## @@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isNewGroupCoordinatorEnabled()) { cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true")) } + +if(isKRaftTest()) { Review Comment: the issue was that we don't set metadata dir which in this case we fall back to the first log dir. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
jolshan commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1486510262 ## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ## @@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isNewGroupCoordinatorEnabled()) { cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true")) } + +if(isKRaftTest()) { Review Comment: nit: space after if So the issue was that the metadata and other log directories were using the same temp directory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on code in PR #15349: URL: https://github.com/apache/kafka/pull/15349#discussion_r1486493069 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java: ## @@ -31,6 +32,7 @@ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer, * which interleaves transaction commit messages into the source topic which are not propagated downstream. */ +@Tag("integration") Review Comment: It is, so this change is actually a no-op, I didn't realize that. It is more consistent with the other MM2 integration tests which also inherit the tag, but specify it anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on code in PR #15349: URL: https://github.com/apache/kafka/pull/15349#discussion_r1486493069 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java: ## @@ -31,6 +32,7 @@ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer, * which interleaves transaction commit messages into the source topic which are not propagated downstream. */ +@Tag("integration") Review Comment: It is, so this change is actually a no-op, I didn't realize that. It is more consistent with the other MM2 integration tests which also inherit the tag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16243: -- Assignee: Lucas Brutschy (was: Andrew Schofield) > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]
gaurav-narula commented on code in PR #15356: URL: https://github.com/apache/kafka/pull/15356#discussion_r1486475203 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -270,8 +270,8 @@ public void run() throws Exception { } Map assignment = inflight.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().dirId)); -if (log.isDebugEnabled()) { -log.debug("Dispatching {} assignments: {}", assignment.size(), assignment); +if (log.isInfoEnabled()) { Review Comment: I adhered to the same convention as was before. It turns out the checks are useful when doing string concatenation instead of parameterised messages [0](https://www.slf4j.org/faq.html#logging_performance). Since we're using parameterised messages, I think we can safely avoid the conditionals. Adding a commit for the same. 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]
OmniaGM commented on code in PR #15356: URL: https://github.com/apache/kafka/pull/15356#discussion_r1486462098 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -270,8 +270,8 @@ public void run() throws Exception { } Map assignment = inflight.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().dirId)); -if (log.isDebugEnabled()) { -log.debug("Dispatching {} assignments: {}", assignment.size(), assignment); +if (log.isInfoEnabled()) { Review Comment: Is `log.isInfoEnabled` necessary here? I think we need this log in any log level -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15256: URL: https://github.com/apache/kafka/pull/15256#issuecomment-1939084618 Hello, @mimaison @jolshan Can you, please, take a look. Looks like this PR are ready to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16243: -- Priority: Critical (was: Major) > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Critical > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
OmniaGM commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1486411242 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupBuilder.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ConsumerGroupBuilder { +private final String groupId; +private final int groupEpoch; +private int assignmentEpoch; +private final Map members = new HashMap<>(); +private final Map assignments = new HashMap<>(); +private Map subscriptionMetadata; + +public ConsumerGroupBuilder(String groupId, int groupEpoch) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +this.assignmentEpoch = 0; +} + +public ConsumerGroupBuilder withMember(ConsumerGroupMember member) { +this.members.put(member.memberId(), member); +return this; +} + +public ConsumerGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) { +this.subscriptionMetadata = subscriptionMetadata; +return this; +} + +public ConsumerGroupBuilder withAssignment(String memberId, Map> assignment) { +this.assignments.put(memberId, new Assignment(assignment)); +return this; +} + +public ConsumerGroupBuilder withAssignmentEpoch(int assignmentEpoch) { +this.assignmentEpoch = assignmentEpoch; +return this; +} + +public List build(TopicsImage topicsImage) { +List records = new ArrayList<>(); + +// Add subscription records for members. +members.forEach((memberId, member) -> { Review Comment: same ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupBuilder.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ConsumerGroupBuilder { +private final String groupId; +private final int groupEpoch; +private int assignmentEpoch; +private final Map members = new HashMap<>(); +private final Map assignments = new HashMap<>(); +private Map subscriptionMetadata; + +public ConsumerGroupBuilder(String groupId, int groupEpoch) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +this.assignmentEpoch = 0; +} + +public ConsumerGroupBuilder withMember(ConsumerGroupMember member) { +this.members.put(member.memberId(), member); +return this; +} + +public ConsumerGroupBuilder withSubscriptionMetadata(Map subscripti
[PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]
gaurav-narula opened a new pull request, #15356: URL: https://github.com/apache/kafka/pull/15356 It's hard to ascertain at the moment if AssignmentsManager dispatched an `AssignReplicasToDirsRequest` to the controller and if the controller handled it. This PR adds info logs for observability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on PR #14034: URL: https://github.com/apache/kafka/pull/14034#issuecomment-1938978627 Resolved the recent conflicts because of trunk changes in `LocalLog` by rebasing them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] ignore [kafka]
dajac opened a new pull request, #15355: URL: https://github.com/apache/kafka/pull/15355 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on PR #15354: URL: https://github.com/apache/kafka/pull/15354#issuecomment-1938862746 @jolshan, @gharris1727, @gaurav-narula can you have a look please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
OmniaGM commented on PR #15262: URL: https://github.com/apache/kafka/pull/15262#issuecomment-1938859808 > The issues started for 3.7 on the same day so it is one of the 3 commits backported feb 2 https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=3.7&tests.container=kafka.server.LogDirFailureTest Hi @jolshan just introduced a fix here #15354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM opened a new pull request, #15354: URL: https://github.com/apache/kafka/pull/15354 Fix the flakiness of LogDirFailureTest ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16225) Flaky test suite LogDirFailureTest
[ https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-16225: - Assignee: Omnia Ibrahim > Flaky test suite LogDirFailureTest > -- > > Key: KAFKA-16225 > URL: https://issues.apache.org/jira/browse/KAFKA-16225 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Greg Harris >Assignee: Omnia Ibrahim >Priority: Major > Labels: flaky-test > > I see this failure on trunk and in PR builds for multiple methods in this > test suite: > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179) > at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186) > > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat} > It appears this assertion is failing > [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715] > The other error which is appearing is this: > {noformat} > org.opentest4j.AssertionFailedError: Unexpected exception type thrown, > expected: but was: > > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67) > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164) > > at > kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat} > Failures appear to have started in this commit, but this does not indicate > that this commit is at fault: > [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code
[ https://issues.apache.org/jira/browse/KAFKA-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816631#comment-17816631 ] Ismael Juma commented on KAFKA-16244: - This implies adding the checkstyle annotations to the code - worth making sure the license is ok. There was an issue with the annotations for findBugs, but checkstyle may be OK. > Move code style exceptions from suppressions.xml to the code > > > Key: KAFKA-16244 > URL: https://issues.apache.org/jira/browse/KAFKA-16244 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
ijuma commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1938829826 @gharris1727 Would you be willing to add a brief comment stating the reason for the test being marked integration if it's one of "slow" or "flaky"? For the ones that make sense as integration tests more generally, we don't need to add a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
ijuma commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1938826340 > @ijuma I don't disagree. However, I think that they are tests that we should rather fix vs classifying them as integration tests. As I said, they can be fixed - but they should be fixed separately versus blocking progress towards a useful `unitTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]
lucasbru merged PR #15275: URL: https://github.com/apache/kafka/pull/15275 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]
lianetm commented on code in PR #15275: URL: https://github.com/apache/kafka/pull/15275#discussion_r1486257781 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1392,4 +1356,16 @@ public void registerStateListener(MemberStateListener listener) { } this.stateUpdatesListeners.add(listener); } + +/** + * If either a new target assignment or new metadata is available that we have not yet attempted + * to reconcile, and we are currently in state RECONCILING, trigger reconciliation. + */ +@Override +public PollResult poll(final long currentTimeMs) { +if (state == MemberState.RECONCILING) { +maybeReconcile(); Review Comment: Just for the record you are both right, us requesting metadata updates from the reconciliation, is only setting a flag in the metadata object indicating that it needs a full update, but effectively a single metadata request is going to be issued when that flag is on, on the next network client poll. Here btw, we just have a related jira for a potential future improvement, regarding partial metadata updates when reconciling (https://issues.apache.org/jira/browse/KAFKA-15847), since right now we request it for all topics, even if we only need the ones involved in the assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval
[ https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16165: --- Description: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) We should review the poll expiration logic that triggers a leave group operation. That is currently applied in the HB Manager poll, without any validation, and given it depends on the consumer poll timer, it could happen at any time, regardless of the state of the member. Ex. poll timer could expire when the member is leaving, leading to this leaving->stale invalid transition. We should probably consider that this pro-active leave should only apply when the consumer is in the group (not leaving, unsubscribed or fatal) was: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.cons
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
dajac commented on PR #15348: URL: https://github.com/apache/kafka/pull/15348#issuecomment-1938702499 @OmniaGM @jolshan Thanks for your comments. I have addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
dajac commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1486196044 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -0,0 +1,1485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.opentest4j.AssertionFailedError; + +import java.net.InetAddress; +import java.util.ArrayList;
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
dajac commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1486183604 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -0,0 +1,1485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.opentest4j.AssertionFailedError; + +import java.net.InetAddress; +import java.util.ArrayList;
[jira] [Created] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code
David Jacot created KAFKA-16244: --- Summary: Move code style exceptions from suppressions.xml to the code Key: KAFKA-16244 URL: https://issues.apache.org/jira/browse/KAFKA-16244 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
dajac commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1486181764 ## checkstyle/suppressions.xml: ## @@ -336,11 +336,11 @@ + files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/> Review Comment: I was actually not aware of this. I think that it makes sense. I file https://issues.apache.org/jira/browse/KAFKA-16244 to give it a try. In this PR, I would prefer to keep the `suppressions.xml` exception though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix package name for FetchFromFollowerIntegrationTest [kafka]
divijvaidya opened a new pull request, #15353: URL: https://github.com/apache/kafka/pull/15353 The change will ensure that this test is included when running: ``` ./gradlew :core:unitTest --tests "kafka.server.*" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
divijvaidya commented on PR #15316: URL: https://github.com/apache/kafka/pull/15316#issuecomment-1938598989 As part of this last batch, please also removed this test from the list here: https://github.com/apache/kafka/blob/5cfcc52fb3fce4a43ca77df311382d7a02a40ed2/build.gradle#L424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
[ https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-16239. -- Resolution: Fixed > Clean up references to non-existent IntegrationTestHelper > - > > Key: KAFKA-16239 > URL: https://issues.apache.org/jira/browse/KAFKA-16239 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > Fix For: 3.8.0 > > > A bunch of places in the code javadocs and READ docs refer to a class called > IntegrationTestHelper. Such a class does not exist. > This task will clean up all referenced to IntegrationTestHelper from Kafka > code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
[ https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16239: - Component/s: unit tests > Clean up references to non-existent IntegrationTestHelper > - > > Key: KAFKA-16239 > URL: https://issues.apache.org/jira/browse/KAFKA-16239 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > Fix For: 3.8.0 > > > A bunch of places in the code javadocs and READ docs refer to a class called > IntegrationTestHelper. Such a class does not exist. > This task will clean up all referenced to IntegrationTestHelper from Kafka > code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
[ https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16239: - Fix Version/s: 3.8.0 > Clean up references to non-existent IntegrationTestHelper > - > > Key: KAFKA-16239 > URL: https://issues.apache.org/jira/browse/KAFKA-16239 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > Fix For: 3.8.0 > > > A bunch of places in the code javadocs and READ docs refer to a class called > IntegrationTestHelper. Such a class does not exist. > This task will clean up all referenced to IntegrationTestHelper from Kafka > code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16239: Clean up references to non-existent IntegrationTestHelper [kafka]
divijvaidya merged PR #15352: URL: https://github.com/apache/kafka/pull/15352 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]
satishd merged PR #15209: URL: https://github.com/apache/kafka/pull/15209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix syntax error in release.py [kafka]
divijvaidya merged PR #15350: URL: https://github.com/apache/kafka/pull/15350 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer
[ https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-14041. -- Resolution: Fixed > Avoid the keyword var for a variable declaration in ConfigTransformer > - > > Key: KAFKA-14041 > URL: https://issues.apache.org/jira/browse/KAFKA-14041 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: QualiteSys QualiteSys >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.8.0 > > > In the file > clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a > variable named var is declared : > line 84 : for (ConfigVariable var : vars) { > Since it is a java keyword, could the variable name be changed ? > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer
[ https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14041: - Priority: Minor (was: Major) > Avoid the keyword var for a variable declaration in ConfigTransformer > - > > Key: KAFKA-14041 > URL: https://issues.apache.org/jira/browse/KAFKA-14041 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: QualiteSys QualiteSys >Assignee: Andrew Schofield >Priority: Minor > Fix For: 3.8.0 > > > In the file > clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a > variable named var is declared : > line 84 : for (ConfigVariable var : vars) { > Since it is a java keyword, could the variable name be changed ? > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer
[ https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14041: - Fix Version/s: 3.8.0 > Avoid the keyword var for a variable declaration in ConfigTransformer > - > > Key: KAFKA-14041 > URL: https://issues.apache.org/jira/browse/KAFKA-14041 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: QualiteSys QualiteSys >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.8.0 > > > In the file > clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a > variable named var is declared : > line 84 : for (ConfigVariable var : vars) { > Since it is a java keyword, could the variable name be changed ? > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14041: Avoid the keyword var for a variable declaration [kafka]
divijvaidya merged PR #15351: URL: https://github.com/apache/kafka/pull/15351 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer [kafka]
lucasbru commented on code in PR #15339: URL: https://github.com/apache/kafka/pull/15339#discussion_r1485988931 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -95,6 +98,8 @@ public class MembershipManagerImplTest { private BlockingQueue backgroundEventQueue; private BackgroundEventHandler backgroundEventHandler; private Time time; +private Metrics metrics; +private RebalanceMetricsManager rebalanceMetricsManager = mock(RebalanceMetricsManager.class); Review Comment: In this class, you sometimes mock the rebalancemetricsmanager, and sometimes you use the real one. I think it's easier to stick to one of the two, so either the metrics manager is part of the unit-under-test or not. You can probably just use the "real one" everywhere? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -242,7 +234,7 @@ public void testTransitionToFailedWhenTryingToJoin() { MembershipManagerImpl membershipManager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), -backgroundEventHandler, time); +backgroundEventHandler, time, mock(RebalanceMetricsManager.class)); Review Comment: why no use your member `rebalanceMetricsManager` here? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -163,7 +155,7 @@ public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() MembershipManagerImpl manager = new MembershipManagerImpl( GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), -backgroundEventHandler, time); +backgroundEventHandler, time, mock(RebalanceMetricsManager.class)); Review Comment: why no use your member `rebalanceMetricsManager` here? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -75,6 +75,11 @@ public interface MembershipManager { */ void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response); +/** + * Notify the member that an error heartbeat response was received. + */ +void onHeartbeatError(); Review Comment: I wonder if it would make sense to rename `onHeartbeatResponseReceived` in this change, since an error response is also a response that we receive -- it should be clear that it's only called in the successful case. Maybe we could call the two functions `onHeartbeatSuccess` and `onHeartbeatFailure`, going along with the convention in `RequestFutureListener` etc.pp.? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1610,12 +1605,152 @@ public void testHeartbeatSentOnStaledMember() { @Test public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() { -MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(null); +MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(MemberState.JOINING, membershipManager.state()); receiveEmptyAssignment(membershipManager); assertEquals(MemberState.STABLE, membershipManager.state()); } +@Test +public void testMetricsWhenHeartbeatFailed() { +rebalanceMetricsManager = new RebalanceMetricsManager(metrics); +MembershipManagerImpl membershipManager = createMemberInStableState(); +membershipManager.onHeartbeatError(); + +// Not expecting rebalance failures without assignments being reconciled +assertEquals(0.0d, getMetricValue(metrics, rebalanceMetricsManager.rebalanceTotal)); +assertEquals(0.0d, getMetricValue(metrics, rebalanceMetricsManager.failedRebalanceTotal)); +} + +@Test +public void testRebalanceMetricsOnSuccessfulRebalance() { +rebalanceMetricsManager = new RebalanceMetricsManager(metrics); +MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); +ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); +mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1"); + +CompletableFuture commitResult = mockRevocationNoCallbacks(true); + +receiveEmptyAssignment(membershipManager); +long reconciliationDurationMs = sleepRandomMs(time, 2000); + +// Complete commit request to complete the callback invocation +
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
nicktelford commented on code in PR #15264: URL: https://github.com/apache/kafka/pull/15264#discussion_r1485925149 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1836,6 +1842,36 @@ public void maybeThrowTaskExceptionsFromProcessingThreads() { } } +private boolean transactionBuffersExceedCapacity = false; + +boolean transactionBuffersExceedCapacity() { +return transactionBuffersExceedCapacity; +} + +boolean transactionBuffersWillExceedCapacity() { +final boolean transactionBuffersAreUnbounded = maxUncommittedStateBytes < 0; +if (transactionBuffersAreUnbounded) { +return false; +} + +// force an early commit if the uncommitted bytes exceeds or is *likely to exceed* the configured threshold Review Comment: > First, IIUC, the idea is to use the number of additional uncommitted bytes since the last commit as a heuristic to estimate how many more uncommitted bytes will be added if we don't commit right now, and thus try to commit "just before" we go over the limit. Correct, this is the intent. One correction though: we're calculating the delta since the last _iteration_ of the StreamThread, not since the last commit. This behaviour was requested on the mailing list, IIRC by @cadonna, to reduce the risk of OOM between commits. The thinking was that users will expect `statestore.uncommitted.max.bytes` to be an upper-bound, and likely set it with little headroom. If the commit interval is fairly long (e.g. 30 seconds under ALOS), then there's a good chance we would exceed this limit by a considerable margin. > [...] so we expect to see updates to the same keys. [...] and see fewer and fewer new keys vs in-place updates. Writes in RocksDB are the same size, irrespective of whether they are inserts of new keys, or updates to existing keys. The only case I can think of where this may be different is when migrating an existing record from a non-timestamped to a timestamped column family; in which case, the updates would actually be slightly larger, due to the additional tombstone written. Ultimately, any heuristic we choose will be imperfect. If we track an average, even since the last commit, then it would be less responsive to sudden increases in throughput, potentially allowing an OOM before the average has had a chance to catch up. The current approach attempts to err on the side of caution: we may trigger a commit unnecessarily early on occasion, but we should be less likely to exceed the configured buffer size. I'm not against changing this heuristic to a running average, or anything else; I'm just not sufficiently convinced it would be better :grin: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org