[jira] [Commented] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code

2024-02-12 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816873#comment-17816873
 ] 

David Jacot commented on KAFKA-16244:
-

[~ijuma] I actually learnt yesterday that we are already doing it: 
[https://github.com/apache/kafka/pull/15139#discussion_r1451037707.] [~jsancio] 
Did you check the license when you added support for it?

> Move code style exceptions from suppressions.xml to the code
> 
>
> Key: KAFKA-16244
> URL: https://issues.apache.org/jira/browse/KAFKA-16244
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-12 Thread via GitHub


nizhikov commented on PR #15256:
URL: https://github.com/apache/kafka/pull/15256#issuecomment-1940499322

   @showuon Thank you for review and merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-02-12 Thread via GitHub


nizhikov commented on PR #15216:
URL: https://github.com/apache/kafka/pull/15216#issuecomment-1940488544

   Hello @philipnee @lucasbru
   
   Why you remove GroupState class in this commit?
   This class will be used in java version of ConsoleGroupCommand when 
https://github.com/apache/kafka/pull/14471 will be fully merged
   Class was merged to trunk in 
https://github.com/apache/kafka/commit/ff25c350a799f372ce6c56e14bbfe3bc7e088218 
to simplify further reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-12 Thread via GitHub


dajac commented on PR #15150:
URL: https://github.com/apache/kafka/pull/15150#issuecomment-1940417533

   @rreddy-22 There are conflicts due to changes made in 
https://github.com/apache/kafka/commit/88c5543ccfb517ceba8dc56d22576efdf2540e0e.
 Could you take a look when you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] POC: run group coordinator state machine on request handler pool [kafka]

2024-02-12 Thread via GitHub


github-actions[bot] commented on PR #14728:
URL: https://github.com/apache/kafka/pull/14728#issuecomment-1940369610

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2024-02-12 Thread via GitHub


github-actions[bot] commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1940369577

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-12 Thread via GitHub


showuon merged PR #15256:
URL: https://github.com/apache/kafka/pull/15256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-12 Thread via GitHub


showuon commented on PR #15256:
URL: https://github.com/apache/kafka/pull/15256#issuecomment-1940345746

   > Hello @showuon
   > 
   > I found that `DeleteTopicTest.scala` don't close resources allocated in 
case of kraft run. It was hided by removed tests and lead to the tests failures 
we observed earlier.
   > 
   > I fixed `DeleteTopicTest` and now it seems that CI is OK.
   > 
   > Take a look one more time, please.
   
   Nice find! Thanks for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-12 Thread via GitHub


hgeraldino commented on PR #15316:
URL: https://github.com/apache/kafka/pull/15316#issuecomment-1940100133

   Final batch of the `WorkerSinkTaskTest` migration  🎉 🎉 🎉
   
   This one is probably the largest of the 3 (sorry). 
   We can either do one final commit renaming the test from 
`WorkerSinkTaskTestMockito` back to `WorkerSinkTastTest`, or just open a new PR 
once this gets merged to do the cleanup
   
   Tagging @divijvaidya @gharris1727 @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on PR #15359:
URL: https://github.com/apache/kafka/pull/15359#issuecomment-1939789669

   @splett2 @jolshan @artemlivshits @cmccabe With the discussion the the 
previous PR, I changed the logic to enforce to include all the target replicas 
in the ISR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on PR #14604:
URL: https://github.com/apache/kafka/pull/14604#issuecomment-1939785641

   Opened a separate PR https://github.com/apache/kafka/pull/15359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15665) Enforce ISR to have all target replicas when complete partition reassignment

2024-02-12 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-15665:
---
Summary: Enforce ISR to have all target replicas when complete partition 
reassignment  (was: Enforce min ISR when complete partition reassignment)

> Enforce ISR to have all target replicas when complete partition reassignment
> 
>
> Key: KAFKA-15665
> URL: https://issues.apache.org/jira/browse/KAFKA-15665
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>
> Current partition reassignment can be completed when the new ISR is under min 
> ISR. We should fix this behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent closed pull request #14604: KAFKA-15665: Enforce min ISR when 
complete partition reassignment.
URL: https://github.com/apache/kafka/pull/14604


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-02-12 Thread via GitHub


ableegoldman commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1486914766


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+private boolean transactionBuffersExceedCapacity = false;
+
+boolean transactionBuffersExceedCapacity() {
+return transactionBuffersExceedCapacity;
+}
+
+boolean transactionBuffersWillExceedCapacity() {
+final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
+if (transactionBuffersAreUnbounded) {
+return false;
+}
+
+// force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold

Review Comment:
   > Writes in RocksDB are the same size, irrespective of whether they are 
inserts of new keys, or updates to existing keys.
   
   I could be wrong about this, but I thought that writing to an existing key 
in the active memtable -- ie before it is flushed and/or made immutable  --  is 
indeed applied as an overwrite, and does not grow the write buffer's overall 
size? Of course this only applies to patterns in which keys are updated 
multiple times between flushes, sure. Still, I feel like this is one of the 
major points of the write buffer: to deduplicate same-key writes as much as 
possible before flushing them, since once they're flushed it has to wait for a 
compaction to deduplicate further.
   
   That said, I don't feel strongly about this, and any potential performance 
degradation should come to light in the benchmarks. Thanks for explaining your 
thoughts on this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9376) Plugin class loader not found using MM2

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-9376.

Fix Version/s: 2.5.0
   Resolution: Fixed

> Plugin class loader not found using MM2
> ---
>
> Key: KAFKA-9376
> URL: https://issues.apache.org/jira/browse/KAFKA-9376
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sinóros-Szabó Péter
>Priority: Minor
> Fix For: 2.5.0
>
>
> I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader 
> errors. MM2 seems to be working, but I do not know if all of it components 
> are working as expected as this is the first time I use MM2.
> I run MM2 with the following command:
> {code:java}
> ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
> {code}
> Errors are:
> {code:java}
> [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,904] INFO ConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:347)
> [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  
> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
> [2020-01-07 15:06:17,905] INFO TaskConfig values:
>  task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask
>  (org.apache.kafka.connect.runtime.TaskConfig:347)
> [2020-01-07 15:06:17,905] INFO Instantiated task MirrorHeartbeatConnector-0 
> with version 1 of type org.apache.kafka.connect.mirror.MirrorHeartbeatTask 
> (org.apache.kafka.connect.runtime.Worker:434){code}
> After a while, these errors are not logged any more.
> Config is:
> {code:java}
> clusters = eucmain, euwbackup
> eucmain.bootstrap.servers = kafka1:9092,kafka2:9092
> euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
> eucmain->euwbackup.enabled = true
> eucmain->euwbackup.topics = .*
> eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__|pricing).*
> eucmain->euwbackup.rename.topics = false
> rename.topics = false
> eucmain->euwbackup.sync.topic.acls.enabled = false
> sync.topic.acls.enabled = false{code}
> Using OpenJDK 8 or 11, I get the same error.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9376) Plugin class loader not found using MM2

2024-02-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816808#comment-17816808
 ] 

Greg Harris commented on KAFKA-9376:


Closing the loop here, this was a log message which was originally only called 
on true error conditions, but due to another change (KAFKA-8340) it was then 
called every time a classpath plugin was used.
Since this was a spurious warning, it was changed to TRACE by a minor PR: 
[https://github.com/apache/kafka/pull/7964] that landed in the next release.
I'm closing this issue as the log message is no longer spuriously printed.

> Plugin class loader not found using MM2
> ---
>
> Key: KAFKA-9376
> URL: https://issues.apache.org/jira/browse/KAFKA-9376
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sinóros-Szabó Péter
>Priority: Minor
>
> I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader 
> errors. MM2 seems to be working, but I do not know if all of it components 
> are working as expected as this is the first time I use MM2.
> I run MM2 with the following command:
> {code:java}
> ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
> {code}
> Errors are:
> {code:java}
> [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,904] INFO ConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:347)
> [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  
> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
> [2020-01-07 15:06:17,905] INFO TaskConfig values:
>  task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask
>  (org.apache.kafka.connect.runtime.TaskConfig:347)
> [2020-01-07 15:06:17,905] INFO Instantiated task MirrorHeartbeatConnector-0 
> with version 1 of type org.apache.kafka.connect.mirror.MirrorHeartbeatTask 
> (org.apache.kafka.connect.runtime.Worker:434){code}
> After a while, these errors are not logged any more.
> Config is:
> {code:java}
> clusters = eucmain, euwbackup
> eucmain.bootstrap.servers = kafka1:9092,kafka2:9092
> euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
> eucmain->euwbackup.enabled = true
> eucmain->euwbackup.topics = .*
> eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__|pricing).*
> eucmain->euwbackup.rename.topics = false
> rename.topics = false
> eucmain->euwbackup.sync.topic.acls.enabled = false
> sync.topic.acls.enabled = false{code}
> Using OpenJDK 8 or 11, I get the same error.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9070) Kafka Connect - Get the wrong offset value comes from Kafka Connect after increase the number of offset storage topic partition to 3

2024-02-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816806#comment-17816806
 ] 

Greg Harris commented on KAFKA-9070:


Adding partitions to a topic is generally an unsafe activity, so I'm not 
surprised that Connect has a problem with it.

That said, I think the best workaround for this problem is to use 3.6.0's 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 feature to export and re-import the offsets to forcibly re-partition the topic.

1. Stop all of the connectors
2. Export the connector offsets to a file with KIP-875.
3. Stop the Connect cluster
4. Delete the offsets topic, and recreate it with the increased number of 
partitions. Or, create the new topic and reconfigure the workers to use it 
instead.
5. Start the Connect cluster
6. Write the saved offsets back to the cluster via KIP-875
7. Resume all of the connectors.

> Kafka Connect - Get the wrong offset value comes from Kafka Connect after 
> increase the number of offset storage topic partition to 3
> 
>
> Key: KAFKA-9070
> URL: https://issues.apache.org/jira/browse/KAFKA-9070
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0
> Environment: debezium/connect: 0.10
> mysql: 5.6
> kafka: 2.3.0
>Reporter: kaikai.hou
>Priority: Major
>  Labels: connect
>
> I'm using the *Debezium* project with distributed mode.
> *Problem:*
> I found a problem : Get the wrong offset value comes from Kafka Connect after 
> increase the number of offset storage topic partition to 3.
> 1. Cluster mode, tow node (container);
> 2. Offset storage topic only have 1 partition;
> 3. Create 3 mysql connector ;
> 4. Change data, all connector have offset recored in partition 0.
> 4. *Increase offset storage topic partition number to 3*. 
> 5. Change data, and some connector store their offset records to partition 1 
> or partition 2.
> 6.* Restart all connect service, then all connector will read offset records 
> from partition 0*.
> 7. Then, the connector that stored offset records to partition 1 or partition 
> 2 will get *repeat data*. (the offset records in partition 0 are* too old* 
> for this connectors)
> *Debug*
> The Debezium developer checked their code found that the partition handling 
> is delegated to Kafka Connect. [their 
> reply|https://issues.jboss.org/browse/DBZ-1551?focusedCommentId=13800286&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13800286]
> Then, after they reproduced the problem, they found that [the problem is that 
> the incorrect offset is returned by Kafka 
> Connect|https://issues.jboss.org/browse/DBZ-1551?focusedCommentId=13801400&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13801400]
>  . 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15421: fix network thread leak in testThreadPoolResize [kafka]

2024-02-12 Thread via GitHub


jolshan commented on PR #14320:
URL: https://github.com/apache/kafka/pull/14320#issuecomment-1939683100

   Hmm I see this flaking again -- 
https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=kafka.server.DynamicBrokerReconfigurationTest&tests.test=testThreadPoolResize()
   
   Any ideas?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


jolshan commented on PR #15354:
URL: https://github.com/apache/kafka/pull/15354#issuecomment-1939677736

   I still see one LogDirFailureTest failure:
   
   ```
   org.apache.kafka.server.fault.FaultHandlerException: 
quorumTestHarnessFaultHandler: Error applying topics delta in MetadataDelta up 
to 70: Log for partition topic-0 is not available on broker 1
   ```
   
   Can we confirm it is not because the log directory is missing?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


jolshan commented on PR #15354:
URL: https://github.com/apache/kafka/pull/15354#issuecomment-1939674788

   I filed a JIRA for DescribeConsumerGroupTest 
https://issues.apache.org/jira/browse/KAFKA-16245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-7217) Loading dynamic topic data into kafka connector sink using regex

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-7217.

Fix Version/s: 1.1.0
   Resolution: Fixed

> Loading dynamic topic data into kafka connector sink using regex
> 
>
> Key: KAFKA-7217
> URL: https://issues.apache.org/jira/browse/KAFKA-7217
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 1.1.0
>Reporter: Pratik Gaglani
>Priority: Major
> Fix For: 1.1.0
>
>
> The new feature to use regex KAFKA-3074
> in connectors, however it seems that the topic data from the newly added 
> topics after the connector has been started is not consumed until the 
> connector is restarted. We have a need to dynamically added new topic and 
> have connector consume the topic based on regex defined in properties of 
> connector. How can it be achieved? Ex: regex: topic-.* topic: topic-1, 
> topic-2 If I introduce new topic topic-3, then how can I make the connector 
> consume the topic data without restarting it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16245) DescribeConsumerGroupTest failing

2024-02-12 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16245:
--

 Summary: DescribeConsumerGroupTest failing
 Key: KAFKA-16245
 URL: https://issues.apache.org/jira/browse/KAFKA-16245
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


The first instances on trunk are in this PR 
[https://github.com/apache/kafka/pull/15275]
And this PR seems to have it failing consistently in the builds when it wasn't 
failing this consistently before.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-6340) Support of transactions in KafkaConnect

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-6340.

Fix Version/s: 3.3.0
   Resolution: Fixed

> Support of transactions in KafkaConnect
> ---
>
> Key: KAFKA-6340
> URL: https://issues.apache.org/jira/browse/KAFKA-6340
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 3.3.0
>
>
> Now KafkaConnect source connectors commit source offset in periodic task.
> Proposed approach is to produce data record batch and the config update 
> records (related to the batch) within Kafka transaction, that would prevent 
> publishing duplicate records on connector restart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4961) Mirrormaker crash with org.apache.kafka.common.protocol.types.SchemaException

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-4961.

Resolution: Won't Fix

> Mirrormaker crash with org.apache.kafka.common.protocol.types.SchemaException
> -
>
> Key: KAFKA-4961
> URL: https://issues.apache.org/jira/browse/KAFKA-4961
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Di Shang
>Priority: Major
>  Labels: mirror-maker
>
> We are running a cluster of 3 brokers and using mirrormaker to replicate a 
> topic to a different 3-broker cluster. Occasionally we find that when the 
> source cluster is under heavy load with lots of messages coming in, 
> mirrormaker will crash with SchemaException. 
> {noformat}
> 27 Mar 2017 19:02:22.030 [mirrormaker-thread-0] DEBUG 
> org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) 
> Disconnecting from node 5 due to request timeout.
> 27 Mar 2017 19:02:22.032 [mirrormaker-thread-0] DEBUG 
> org.apache.kafka.clients.NetworkClient handleTimedOutRequests(line:399) 
> Disconnecting from node 7 due to request timeout.
> 27 Mar 2017 19:02:22.033 [mirrormaker-thread-0] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
> onComplete(line:376) Cancelled FETCH request 
> ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@96db60c9,
>  
> request=RequestSend(header={api_key=1,api_version=1,correlation_id=76978,client_id=dx-stg02-wdc04-0},
>  
> body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=0,fetch_offset=129037541,max_bytes=1048576},{partition=1,fetch_offset=120329329,max_bytes=1048576},{partition=33,fetch_offset=125526115,max_bytes=1048576},{partition=36,fetch_offset=125526627,max_bytes=1048576},{partition=5,fetch_offset=121654333,max_bytes=1048576},{partition=37,fetch_offset=120262628,max_bytes=1048576},{partition=9,fetch_offset=125568321,max_bytes=1048576},{partition=41,fetch_offset=121593740,max_bytes=1048576},{partition=12,fetch_offset=125563836,max_bytes=1048576},{partition=13,fetch_offset=122044962,max_bytes=1048576},{partition=45,fetch_offset=125504213,max_bytes=1048576},{partition=48,fetch_offset=125506892,max_bytes=1048576},{partition=17,fetch_offset=121635934,max_bytes=1048576},{partition=49,fetch_offset=121985309,max_bytes=1048576},{partition=21,fetch_offset=125549718,max_bytes=1048576},{partition=24,fetch_offset=125548506,max_bytes=1048576},{partition=25,fetch_offset=120289719,max_bytes=1048576},{partition=29,fetch_offset=121612535,max_bytes=1048576}]}]}),
>  createdTimeMs=1490641301465, sendTimeMs=1490641301465) with correlation id 
> 76978 due to node 5 being disconnected
> 27 Mar 2017 19:02:22.035 [mirrormaker-thread-0] DEBUG 
> org.apache.kafka.clients.consumer.internals.Fetcher onFailure(line:144) Fetch 
> failed
> org.apache.kafka.common.errors.DisconnectException: null
> 27 Mar 2017 19:02:22.037 [mirrormaker-thread-0] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 
> onComplete(line:376) Cancelled FETCH request 
> ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@fcb8c50d,
>  
> request=RequestSend(header={api_key=1,api_version=1,correlation_id=76980,client_id=dx-stg02-wdc04-0},
>  
> body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=logging,partitions=[{partition=32,fetch_offset=125478125,max_bytes=1048576},{partition=2,fetch_offset=121280695,max_bytes=1048576},{partition=3,fetch_offset=125515146,max_bytes=1048576},{partition=35,fetch_offset=121216188,max_bytes=1048576},{partition=38,fetch_offset=121220634,max_bytes=1048576},{partition=7,fetch_offset=121634123,max_bytes=1048576},{partition=39,fetch_offset=125464566,max_bytes=1048576},{partition=8,fetch_offset=125515210,max_bytes=1048576},{partition=11,fetch_offset=121257359,max_bytes=1048576},{partition=43,fetch_offset=121571984,max_bytes=1048576},{partition=44,fetch_offset=125455538,max_bytes=1048576},{partition=14,fetch_offset=121264791,max_bytes=1048576},{partition=15,fetch_offset=125495034,max_bytes=1048576},{partition=47,fetch_offset=121199057,max_bytes=1048576},{partition=19,fetch_offset=121613792,max_bytes=1048576},{partition=20,fetch_offset=125495807,max_bytes=1048576},{partition=23,fetch_offset=121237155,max_bytes=1048576},{partition=26,fetch_offset=121249178,max_bytes=1048576},{partition=27,fetch_offset=125317927,max_bytes=1048576},{partition=31,fetch_offset=121591702,max_bytes=1048576}]}]}),
>  createdTimeMs=1490641301466, sendTimeMs=1490641301466) with correlation id 
> 76980 due to node 7 being disconnected

Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]

2024-02-12 Thread via GitHub


jolshan commented on PR #15275:
URL: https://github.com/apache/kafka/pull/15275#issuecomment-1939663048

   Hey folks -- looks like this broke DescribeConsumerGroupTest. Can we take a 
look? cc: @lucasbru 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-3213) [CONNECT] It looks like we are not backing off properly when reconfiguring tasks

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-3213.

Fix Version/s: 3.5.0
   Resolution: Fixed

> [CONNECT] It looks like we are not backing off properly when reconfiguring 
> tasks
> 
>
> Key: KAFKA-3213
> URL: https://issues.apache.org/jira/browse/KAFKA-3213
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Gwen Shapira
>Assignee: Liquan Pei
>Priority: Major
> Fix For: 3.5.0
>
>
> Looking at logs of attempt to reconfigure connector while leader is 
> restarting, I see:
> {code}
> [2016-01-29 20:31:01,799] ERROR IO error forwarding REST request:  
> (org.apache.kafka.connect.runtime.rest.RestServer)
> java.net.ConnectException: Connection refused
> [2016-01-29 20:31:01,802] ERROR Request to leader to reconfigure connector 
> tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error 
> trying to forward REST request: Connection refused
> [2016-01-29 20:31:01,802] ERROR Failed to reconfigure connector's tasks, 
> retrying after backoff: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error 
> trying to forward REST request: Connection refused
> [2016-01-29 20:31:01,803] DEBUG Sending POST with input 
> [{"tables":"bar","table.poll.interval.ms":"1000","incrementing.column.name":"id","connection.url":"jdbc:mysql://worker1:3306/testdb?user=root","name":"test-mysql-jdbc","tasks.max":"3","task.class":"io.confluent.connect.jdbc.JdbcSourceTask","poll.interval.ms":"1000","topic.prefix":"test-","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","mode":"incrementing"},{"tables":"foo","table.poll.interval.ms":"1000","incrementing.column.name":"id","connection.url":"jdbc:mysql://worker1:3306/testdb?user=root","name":"test-mysql-jdbc","tasks.max":"3","task.class":"io.confluent.connect.jdbc.JdbcSourceTask","poll.interval.ms":"1000","topic.prefix":"test-","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","mode":"incrementing"}]
>  to http://worker2:8083/connectors/test-mysql-jdbc/tasks 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> [2016-01-29 20:31:01,803] ERROR IO error forwarding REST request:  
> (org.apache.kafka.connect.runtime.rest.RestServer)
> java.net.ConnectException: Connection refused
> [2016-01-29 20:31:01,804] ERROR Request to leader to reconfigure connector 
> tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: IO Error 
> trying to forward REST request: Connection refused
> {code}
> Note that it looks like we are retrying every 1ms, while I'd expect a retry 
> every 250ms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13338) kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact

2024-02-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816800#comment-17816800
 ] 

Greg Harris commented on KAFKA-13338:
-

Apache Kafka does not have 5.x or 6.x releases, so you must be using a fork of 
Kafka.

The log message and your remediation are as-expected; Connect must use 
compacted internal topics, and enforcement of this was added after-the-fact.
The delay between the upgrade and the connectors failing also indicates that 
the two are unrelated, and compaction shouldn't be able to cause a 
InvalidRecordException.

As this issue is so stale, and there isn't any information to proceed on, I 
have closed it. Feel free to raise a new issue if this problem reappears.

> kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact
> --
>
> Key: KAFKA-13338
> URL: https://issues.apache.org/jira/browse/KAFKA-13338
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Fátima Galera
>Priority: Major
> Attachments: image003.png
>
>
> Hi all,
>  
> During kafka upgrade from 5.3.1 to 6.1.1.1 we get below error starting 
> kafka-connector services
>  
> ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception 
> in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> 87738-org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' 
> supplied via the 'offset.storage.topic' property is required to have 
> 'cleanup.policy=compact' to guarantee consistency and durability of source 
> connector offsets, but found the topic currently has 'cleanup.policy=delete'. 
> Continuing would likely result in eventually losing source connector offsets 
> and problems restarting this Connect cluster in the future. Change the 
> 'offset.storage.topic' property in the Connect worker configurations to use a 
> topic with 'cleanup.policy=compact'.
>  
> After that error we added log.cleanup.policy=compact parameter on 
> /etc/kafka/server.properties and kafka-connector service started but a few 
> days later all the connectors are down with attached error.
>  
> Could you please let us know how we can resolved the issue?
>  
> Best regards and thanks
> Fátima
> NGA -Alight
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16229) Slow expiration of Producer IDs leading to high CPU usage

2024-02-12 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-16229.

Resolution: Fixed

> Slow expiration of Producer IDs leading to high CPU usage
> -
>
> Key: KAFKA-16229
> URL: https://issues.apache.org/jira/browse/KAFKA-16229
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Expiration of ProducerIds is implemented with a slow removal of map keys:
> ```
>         producers.keySet().removeAll(keys);
> ```
> Unnecessarily going through all producer ids and then throw all expired keys 
> to be removed.
> This leads to exponential time on worst case when most/all keys need to be 
> removed:
> ```
> Benchmark                                        (numProducerIds)  Mode  Cnt  
>          Score            Error  Units
> ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    3  
>       9164.043 ±      10647.877  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    3  
>     341561.093 ±      20283.211  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds             1  avgt    3  
>   44957983.550 ±    9389011.290  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds            10  avgt    3  
> 5683374164.167 ± 1446242131.466  ns/op
> ```
> A simple fix is to use map#remove(key) instead, leading to a more linear 
> growth:
> ```
> Benchmark                                        (numProducerIds)  Mode  Cnt  
>       Score         Error  Units
> ProducerStateManagerBench.testDeleteExpiringIds               100  avgt    3  
>    5779.056 ±     651.389  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds              1000  avgt    3  
>   61430.530 ±   21875.644  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds             1  avgt    3  
>  643887.031 ±  600475.302  ns/op
> ProducerStateManagerBench.testDeleteExpiringIds            10  avgt    3  
> 7741689.539 ± 3218317.079  ns/op
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13338) kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-13338.
-
Resolution: Invalid

> kafka upgrade 6.1.1 asking to change log.cleanup.policy from delete to compact
> --
>
> Key: KAFKA-13338
> URL: https://issues.apache.org/jira/browse/KAFKA-13338
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Fátima Galera
>Priority: Major
> Attachments: image003.png
>
>
> Hi all,
>  
> During kafka upgrade from 5.3.1 to 6.1.1.1 we get below error starting 
> kafka-connector services
>  
> ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception 
> in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> 87738-org.apache.kafka.common.config.ConfigException: Topic 'connect-offsets' 
> supplied via the 'offset.storage.topic' property is required to have 
> 'cleanup.policy=compact' to guarantee consistency and durability of source 
> connector offsets, but found the topic currently has 'cleanup.policy=delete'. 
> Continuing would likely result in eventually losing source connector offsets 
> and problems restarting this Connect cluster in the future. Change the 
> 'offset.storage.topic' property in the Connect worker configurations to use a 
> topic with 'cleanup.policy=compact'.
>  
> After that error we added log.cleanup.policy=compact parameter on 
> /etc/kafka/server.properties and kafka-connector service started but a few 
> days later all the connectors are down with attached error.
>  
> Could you please let us know how we can resolved the issue?
>  
> Best regards and thanks
> Fátima
> NGA -Alight
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13163) MySQL Sink Connector - JsonConverter - DataException: Unknown schema type: null

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-13163.
-
Resolution: Invalid

> MySQL Sink Connector - JsonConverter - DataException: Unknown schema type: 
> null
> ---
>
> Key: KAFKA-13163
> URL: https://issues.apache.org/jira/browse/KAFKA-13163
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Affects Versions: 2.1.1
> Environment: PreProd
>Reporter: Muddam Pullaiah Yadav
>Priority: Major
>
> Please help with the following issue. Really appreciate it! 
>  
> We are using Azure HDInsight Kafka cluster 
> My sink Properties:
>  
> cat mysql-sink-connector
>  {
>  "name":"mysql-sink-connector",
>  "config":
> { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
> "poll.interval.ms":"500", 
> "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
> "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev",
>  "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", 
> "connection.user":"grabmod", "connection.password":"#admin", 
> "auto.create":"true", "auto.evolve":"true", 
> "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "value.converter.schemas.enable":"false", 
> "key.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "key.converter.schemas.enable":"true" }
> }
>  
> [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177)
>  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in 
> error handler
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema 
> type: null
>  at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
>  at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178)
>  [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
> groupId=connect-mysql-sink-connector] Sending LeaveGroup request to 
> coordinator 
> wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
> 2147482646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16068) Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16068:

Labels: newbie++  (was: )

> Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin 
> scanning errors
> ---
>
> Key: KAFKA-16068
> URL: https://issues.apache.org/jira/browse/KAFKA-16068
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie++
>
> The ConnectorValidationIntegrationTest creates test plugins, some with 
> erroneous behavior. In particular:
>  
> {noformat}
> [2023-12-29 10:28:06,548] ERROR Failed to discover Converter in classpath: 
> Unable to instantiate TestConverterWithPrivateConstructor: Plugin class 
> default constructor must be public 
> (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) 
> [2023-12-29 10:28:06,550]
> ERROR Failed to discover Converter in classpath: Unable to instantiate 
> TestConverterWithConstructorThatThrowsException: Failed to invoke plugin 
> constructor (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138)
> java.lang.reflect.InvocationTargetException{noformat}
> These plugins should be eliminated from the classpath, so that the errors do 
> not appear in unrelated tests. Instead, plugins with erroneous behavior 
> should only be present in the TestPlugins, so that tests can opt-in to 
> loading them.
> There are already plugins with private constructors and 
> throwing-exceptions-constructors, so they should be able to be re-used.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14919:

Labels: newbie  (was: )

> MM2 ForwardingAdmin tests should not conflate admin operations
> --
>
> Key: KAFKA-14919
> URL: https://issues.apache.org/jira/browse/KAFKA-14919
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie
>
> The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special 
> implementation of ForwardingAdmin which records admin operations in a static 
> ConcurrentMap, which is then used to perform assertions.
> This has the problem that one variable (allTopics) is used to perform 
> assertions for multiple different methods (adding topics, adding partitions, 
> and syncing configs), despite these operations each being tested separately. 
> This leads to the confusing behavior where each test appears to assert that a 
> particular operation has taken place, and instead asserts that at least one 
> of the operations has taken place. This allows a regression or timeout in one 
> operation to be hidden by the others, making the behavior of the tests much 
> less predictable.
> These tests and/or the metadata store should be changed so that the tests are 
> isolated from one another, and actually perform the assertions that 
> correspond to their titles.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails

2024-02-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15349:

Labels: newbie++  (was: )

> ducker-ak should fail fast when gradlew systemTestLibs fails
> 
>
> Key: KAFKA-15349
> URL: https://issues.apache.org/jira/browse/KAFKA-15349
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie++
>
> If you introduce a flaw into the gradle build which causes the systemTestLibs 
> to fail, such as a circular dependency, then the ducker_test function 
> continues to run tests which are invalid.
> Rather than proceeding to run the tests, the script should fail fast and make 
> the user address the error before continuing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486800347


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   It makes sense to require exclusive access when building the cache, but here 
we're just accessing the built value. So I don't think the synchronization is 
necessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking

2024-02-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816791#comment-17816791
 ] 

Greg Harris commented on KAFKA-16084:
-

Hi [~high.lee] and [~ardada2468] Thanks for volunteering!

If you see a ticket unassigned, you can assign it to yourself, and then work to 
open a PR. Also if you see that an issue is assigned but has been inactive, you 
can ask the current assignee about the issue, and re-assign it if they are 
unresponsive.

> Simplify and deduplicate StandaloneHerderTest mocking
> -
>
> Key: KAFKA-16084
> URL: https://issues.apache.org/jira/browse/KAFKA-16084
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie++
>
> The StandaloneHerderTest has some cruft that can be cleaned up. What i've 
> found:
> * The `connector` field is written in nearly every test, but only read by one 
> test, and looks to be nearly irrelevant.
> * `expectConfigValidation` has two ways of specifying consecutive 
> validations. 1. The boolean shouldCreateConnector which is true in the first 
> invocation and false in subsequent invocations. 2. by passing multiple 
> configurations via varargs.
> * The class uses a mix of Mock annotations and mock(Class) invocations
> * The test doesn't stop the thread pool created inside the herder and might 
> leak threads
> * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
> throughout the test
> * Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
> constants or a util method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @jolshan For correction, I was wrong. The newTargetReplicas is the expected 
target set (previous + adding - removing). The math.min thing mainly prevents 
the reassignment from stuck because of a high minISR by mistake. (We do not 
have any sanity checks when setting the minISR config.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @jolshan For correction, I was wrong. The newTargetReplicas is the expected 
target set. The math.min thing mainly prevents the reassignment from stuck 
because of a high minISR by mistake. (We do not have any sanity checks when 
setting the minISR config.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @jolshan The newTargetReplicas is the expected target set. The math.min 
thing mainly prevents the reassignment from stuck because of a high minISR by 
mistake. (We do not have any sanity checks when setting the minISR config.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486709516


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @jolshan You are right. The newTargetReplicas is the expected target set. 
The math.min thing mainly prevents the reassignment from stuck because of a 
high minISR by mistake. (We do not have any sanity checks when setting the 
minISR config.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16167: re-enable PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup [kafka]

2024-02-12 Thread via GitHub


kirktrue opened a new pull request, #15358:
URL: https://github.com/apache/kafka/pull/15358

   This integration test is now passing, presumably based on recent related 
changes. Re-enabling to ensure it is included in the test suite to catch any 
regressions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


C0urante commented on code in PR #15349:
URL: https://github.com/apache/kafka/pull/15349#discussion_r1486653428


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##
@@ -31,6 +32,7 @@
  * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
  * which interleaves transaction commit messages into the source topic which 
are not propagated downstream.
  */
+@Tag("integration")

Review Comment:
   Good call, agree with removing redundant tags and reducing FUD 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]

2024-02-12 Thread via GitHub


gharris1727 merged PR #15313:
URL: https://github.com/apache/kafka/pull/15313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (2/3) [kafka]

2024-02-12 Thread via GitHub


gharris1727 commented on PR #15313:
URL: https://github.com/apache/kafka/pull/15313#issuecomment-1939373332

   Test failures appear unrelated, and the runtime tests pass locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


jolshan commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486647593


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   Hmm
   
   ```
   newTargetReplicas = new ArrayList<>(replicas.size());
   for (int replica : replicas) {
   if (!removing.contains(replica)) {
   newTargetReplicas.add(replica);
   }
   }
   ```
   
   It seems like it is just the final state -- current replicas minus the 
removing ones. Not the adding ones?   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486639919


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @splett2 Thanks for pointing out the inconsistency. I am not aware of a use 
case if we want to complete the reassignment before all the replicas in the ISR.
   @jolshan The newTargetReplicas is the "adding" replicas and targetIsr can be 
under min ISR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486639832


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -113,14 +116,28 @@ Optional nodeById(int id) {
 return Optional.ofNullable(nodes.get(id));
 }
 
-Cluster cluster() {
+public Cluster cluster() {
 if (clusterInstance == null) {
 throw new IllegalStateException("Cached Cluster instance should 
not be null, but was.");
 } else {
 return clusterInstance;
 }
 }
 
+/**
+ * Get leader-epoch for partition.
+ * @param tp partition
+ * @return leader-epoch if known, else return optional.empty()
+ */
+public Optional leaderEpochFor(TopicPartition tp) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1939299392

   Hey all,
   
   I realized thanks to @C0urante's review comment that the integration tag is 
already inherited from parent classes already, so the "added" tags were really 
no-ops. I've instead removed all of the duplicated integration tags where the 
class already inherits the tag from a parent class, including existing ones.
   
   So when reviewing:
   * If you see a tag removed, that should be a no-op change, as that test was 
already an integration test.
   * If you see a tag added, that was a test which was previously unitTest, and 
is now integrationTest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


gharris1727 commented on code in PR #15349:
URL: https://github.com/apache/kafka/pull/15349#discussion_r1486595231


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##
@@ -31,6 +32,7 @@
  * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
  * which interleaves transaction commit messages into the source topic which 
are not propagated downstream.
  */
+@Tag("integration")

Review Comment:
   I changed this to make it consistent in the other direction, i removed the 
integration tag from all classes which should already inherit it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-12 Thread via GitHub


jolshan commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1486595168


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   One last question from me -- why do we have math.min(minIsrCount, 
newTargetReplicas)
   Is this covering a case where a non "adding" replica is somehow fallen out 
of isr but is not under min isr?
   
   (As an aside from your changes, the existing code is a bit strange -- lines 
104 and 105 seem to do nothing.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   Correct, since snapshot is immutable & race condition is not possible, hence 
removed the code & comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486590491


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
  * @param latestLeaderEpoch latest leader's epoch.
  */
 void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
-if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+if (latestLeaderEpoch.isPresent()

Review Comment:
   The change is intentional. The optimisation that KIP-951 proposed is that 
producer-batch should skip backoff period if "new leader" is known. This change 
corrects the previous logic to update the batch's leader-epoch only if its 
newer. I added a test in `ProducerBatchTest.java` to reflect that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   Correct, since its immutable, this is not possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486572035


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   @hachikuji Interesting. 
   `Metadata.update()` requires mutual exclusion while updating `cache`, other 
internal data structures of `Metadata`. So it makes sense to keep the 
synchronizatiion, what do you think?
   Moreover, `fetchCache` is called once in `Sender::sendProducerData`, so it's 
not a bottle neck in the hotpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-12 Thread via GitHub


lianetm opened a new pull request, #15357:
URL: https://github.com/apache/kafka/pull/15357

   This PR modifies the commit manager for improved retry logic & fixing bugs:
   - defines high level functions for each of the different types of commit: 
commitSync, commitAsync, autoCommitSync (used from consumer close), 
autoCommitAsync (on interval), autoCommitNow (before revocation).
   - moves retry logic to these caller functions, keeping a common response 
error handling that propagates errors that each caller functions retry as it 
needs. 
   
   Fixes the following issues:
   - auto-commit before revocation should retry with latest consumed offsets
   - auto-commit before revocation should only reset the timer once, when the 
rebalance completes
   - StaleMemberEpoch error (fatal) is considered retriable only when 
committing offsets before revocation, where it is retried with backoff if the 
member has a valid epoch. All other commits will fail fatally on stale epoch. 
Note that auto commit on the interval (autoCommitAsync) does not have any 
specific retry logic for the stale epoch, but will effectively retry on the 
next interval (as it does for any other fatal error) 
   - fix duplicate and noisy logs for auto-commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15341:
URL: https://github.com/apache/kafka/pull/15341#discussion_r1486540561


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java:
##
@@ -104,23 +110,14 @@ private Properties effectiveConfigFrom(final Properties 
initialConfig) {
  */
 @SuppressWarnings("WeakerAccess")
 public String brokerList() {
-final EndPoint endPoint = kafka.advertisedListeners().head();
+final EndPoint endPoint = 
kafka.config().effectiveAdvertisedListeners().head();
 return endPoint.host() + ":" + endPoint.port();
 }
 
-
-/**
- * The ZooKeeper connection string aka `zookeeper.connect`.
- */
-@SuppressWarnings("WeakerAccess")
-public String zookeeperConnect() {
-return effectiveConfig.getProperty("zookeeper.connect", 
DEFAULT_ZK_CONNECT);

Review Comment:
   Can we also remove `ZkSessionTimeoutMsProp` 
`KafkaEmbedded.effectiveConfigFrom` line 99 as we don't need it anymore. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15341:
URL: https://github.com/apache/kafka/pull/15341#discussion_r1486534681


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java:
##
@@ -102,13 +101,14 @@ public EmbeddedKafkaCluster(final int numBrokers,
 /**
  * Creates and starts a Kafka cluster.
  */
-public void start() throws IOException {
+public void start() throws Exception {
 log.debug("Initiating embedded Kafka cluster startup");
-log.debug("Starting a ZooKeeper instance");
-zookeeper = new EmbeddedZookeeper();
-log.debug("ZooKeeper instance is running at {}", zKConnectString());
+final KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setNumControllerNodes(1)

Review Comment:
   Any reason why we don't setup the cluster with combined nodes and number of 
controllers == number of brokers instead of one controller and one broker? 
Specially that as far as I can see all the test set number of brokers to 1 
anyway. This also can get us to remove `KafkaEmbedded` which might simplify the 
setup! WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1486527783


##
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##
@@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 if (isNewGroupCoordinatorEnabled()) {
   cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, 
"true"))
 }
+
+if(isKRaftTest()) {

Review Comment:
   The test was flaky because as we call `causeLogDirFailure` some times we 
impact the first `log.dir` which also is `KafkaConfig.metadataLogDir` as we 
don't have `metadata.log.dir`. So to fix the flakiness we need to explicitly 
set `metadata.log.dir` to diff log dir than the ones we could potentially fail 
for the tests. Hope this explains it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


gaurav-narula commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1486519713


##
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##
@@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 if (isNewGroupCoordinatorEnabled()) {
   cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, 
"true"))
 }
+
+if(isKRaftTest()) {

Review Comment:
   To add, #15262 made it apparent because prior to it, the check for metadata 
log dir failure didn't handle relative paths correctly. Refer 
https://github.com/apache/kafka/pull/15262/files#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2589



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1486512687


##
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##
@@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 if (isNewGroupCoordinatorEnabled()) {
   cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, 
"true"))
 }
+
+if(isKRaftTest()) {

Review Comment:
   the issue was that we don't set metadata dir which in this case we fall back 
to the first log dir. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


jolshan commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1486510262


##
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##
@@ -70,6 +70,11 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 if (isNewGroupCoordinatorEnabled()) {
   cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, 
"true"))
 }
+
+if(isKRaftTest()) {

Review Comment:
   nit: space after if
   
   So the issue was that the metadata and other log directories were using the 
same temp directory?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


gharris1727 commented on code in PR #15349:
URL: https://github.com/apache/kafka/pull/15349#discussion_r1486493069


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##
@@ -31,6 +32,7 @@
  * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
  * which interleaves transaction commit messages into the source topic which 
are not propagated downstream.
  */
+@Tag("integration")

Review Comment:
   It is, so this change is actually a no-op, I didn't realize that.
   It is more consistent with the other MM2 integration tests which also 
inherit the tag, but specify it anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


gharris1727 commented on code in PR #15349:
URL: https://github.com/apache/kafka/pull/15349#discussion_r1486493069


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##
@@ -31,6 +32,7 @@
  * Integration test for MirrorMaker2 in which source records are emitted with 
a transactional producer,
  * which interleaves transaction commit messages into the source topic which 
are not propagated downstream.
  */
+@Tag("integration")

Review Comment:
   It is, so this change is actually a no-op, I didn't realize that.
   It is more consistent with the other MM2 integration tests which also 
inherit the tag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group

2024-02-12 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16243:
--

Assignee: Lucas Brutschy  (was: Andrew Schofield)

> Idle kafka-console-consumer with new consumer group protocol preemptively 
> leaves group
> --
>
> Key: KAFKA-16243
> URL: https://issues.apache.org/jira/browse/KAFKA-16243
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Lucas Brutschy
>Priority: Critical
>
> Using the new consumer group protocol with kafka-console-consumer.sh, I find 
> that if I leave the consumer with no records to process for 5 minutes 
> (max.poll.interval.ms = 30ms), the tool logs the following warning 
> message and leaves the group.
> "consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records."
> With the older consumer, this did not occur.
> The reason is that the consumer keeps a poll timer which is used to ensure 
> liveness of the application thread. The poll timer automatically updates 
> while the `Consumer.poll(Duration)` method is blocked, while the newer 
> consumer only updates the poll timer when a new call to 
> `Consumer.poll(Duration)` is issued. This means that the 
> kafka-console-consumer.sh tools, which uses a very long timeout by default, 
> works differently with the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]

2024-02-12 Thread via GitHub


gaurav-narula commented on code in PR #15356:
URL: https://github.com/apache/kafka/pull/15356#discussion_r1486475203


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -270,8 +270,8 @@ public void run() throws Exception {
 }
 Map assignment = 
inflight.entrySet().stream()
 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().dirId));
-if (log.isDebugEnabled()) {
-log.debug("Dispatching {} assignments:  {}", 
assignment.size(), assignment);
+if (log.isInfoEnabled()) {

Review Comment:
   I adhered to the same convention as was before. It turns out the checks are 
useful when doing string concatenation instead of parameterised messages 
[0](https://www.slf4j.org/faq.html#logging_performance).
   
   Since we're using parameterised messages, I think we can safely avoid the 
conditionals. Adding a commit for the same. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15356:
URL: https://github.com/apache/kafka/pull/15356#discussion_r1486462098


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -270,8 +270,8 @@ public void run() throws Exception {
 }
 Map assignment = 
inflight.entrySet().stream()
 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().dirId));
-if (log.isDebugEnabled()) {
-log.debug("Dispatching {} assignments:  {}", 
assignment.size(), assignment);
+if (log.isInfoEnabled()) {

Review Comment:
   Is `log.isInfoEnabled` necessary here? I think we need this log in any log 
level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [1/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-12 Thread via GitHub


nizhikov commented on PR #15256:
URL: https://github.com/apache/kafka/pull/15256#issuecomment-1939084618

   Hello, @mimaison @jolshan 
   
   Can you, please, take a look.
   Looks like this PR are ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group

2024-02-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16243:
--
Priority: Critical  (was: Major)

> Idle kafka-console-consumer with new consumer group protocol preemptively 
> leaves group
> --
>
> Key: KAFKA-16243
> URL: https://issues.apache.org/jira/browse/KAFKA-16243
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Critical
>
> Using the new consumer group protocol with kafka-console-consumer.sh, I find 
> that if I leave the consumer with no records to process for 5 minutes 
> (max.poll.interval.ms = 30ms), the tool logs the following warning 
> message and leaves the group.
> "consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records."
> With the older consumer, this did not occur.
> The reason is that the consumer keeps a poll timer which is used to ensure 
> liveness of the application thread. The poll timer automatically updates 
> while the `Consumer.poll(Duration)` method is blocked, while the newer 
> consumer only updates the poll timer when a new call to 
> `Consumer.poll(Duration)` is issued. This means that the 
> kafka-console-consumer.sh tools, which uses a very long timeout by default, 
> works differently with the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1486411242


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupBuilder.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerGroupBuilder {
+private final String groupId;
+private final int groupEpoch;
+private int assignmentEpoch;
+private final Map members = new HashMap<>();
+private final Map assignments = new HashMap<>();
+private Map subscriptionMetadata;
+
+public ConsumerGroupBuilder(String groupId, int groupEpoch) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+this.assignmentEpoch = 0;
+}
+
+public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
+this.members.put(member.memberId(), member);
+return this;
+}
+
+public ConsumerGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) {
+this.subscriptionMetadata = subscriptionMetadata;
+return this;
+}
+
+public ConsumerGroupBuilder withAssignment(String memberId, Map> assignment) {
+this.assignments.put(memberId, new Assignment(assignment));
+return this;
+}
+
+public ConsumerGroupBuilder withAssignmentEpoch(int assignmentEpoch) {
+this.assignmentEpoch = assignmentEpoch;
+return this;
+}
+
+public List build(TopicsImage topicsImage) {
+List records = new ArrayList<>();
+
+// Add subscription records for members.
+members.forEach((memberId, member) -> {

Review Comment:
   same



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupBuilder.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.RecordHelpers;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerGroupBuilder {
+private final String groupId;
+private final int groupEpoch;
+private int assignmentEpoch;
+private final Map members = new HashMap<>();
+private final Map assignments = new HashMap<>();
+private Map subscriptionMetadata;
+
+public ConsumerGroupBuilder(String groupId, int groupEpoch) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+this.assignmentEpoch = 0;
+}
+
+public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
+this.members.put(member.memberId(), member);
+return this;
+}
+
+public ConsumerGroupBuilder withSubscriptionMetadata(Map subscripti

[PR] MINOR: log AssignReplicasToDirsRequest dispatch and handling [kafka]

2024-02-12 Thread via GitHub


gaurav-narula opened a new pull request, #15356:
URL: https://github.com/apache/kafka/pull/15356

   It's hard to ascertain at the moment if AssignmentsManager dispatched an 
`AssignReplicasToDirsRequest` to the controller and if the controller handled 
it. This PR adds info logs for observability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-02-12 Thread via GitHub


satishd commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1938978627

   Resolved the recent conflicts because of trunk changes in `LocalLog` by 
rebasing them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] ignore [kafka]

2024-02-12 Thread via GitHub


dajac opened a new pull request, #15355:
URL: https://github.com/apache/kafka/pull/15355

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on PR #15354:
URL: https://github.com/apache/kafka/pull/15354#issuecomment-1938862746

   @jolshan, @gharris1727, @gaurav-narula can you have a look please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-02-12 Thread via GitHub


OmniaGM commented on PR #15262:
URL: https://github.com/apache/kafka/pull/15262#issuecomment-1938859808

   > The issues started for 3.7 on the same day so it is one of the 3 commits 
backported feb 2 
https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=3.7&tests.container=kafka.server.LogDirFailureTest
   
   Hi @jolshan just introduced a fix here #15354  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-12 Thread via GitHub


OmniaGM opened a new pull request, #15354:
URL: https://github.com/apache/kafka/pull/15354

   Fix the flakiness of LogDirFailureTest
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-02-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-16225:
-

Assignee: Omnia Ibrahim

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Assignee: Omnia Ibrahim
>Priority: Major
>  Labels: flaky-test
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)    
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)    
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)    
> at 
> kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164)
>     
> at 
> kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat}
> Failures appear to have started in this commit, but this does not indicate 
> that this commit is at fault: 
> [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code

2024-02-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816631#comment-17816631
 ] 

Ismael Juma commented on KAFKA-16244:
-

This implies adding the checkstyle annotations to the code - worth making sure 
the license is ok. There was an issue with the annotations for findBugs, but 
checkstyle may be OK.

> Move code style exceptions from suppressions.xml to the code
> 
>
> Key: KAFKA-16244
> URL: https://issues.apache.org/jira/browse/KAFKA-16244
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


ijuma commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1938829826

   @gharris1727 Would you be willing to add a brief comment stating the reason 
for the test being marked integration if it's one of "slow" or "flaky"? For the 
ones that make sense as integration tests more generally, we don't need to add 
a comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-12 Thread via GitHub


ijuma commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1938826340

   > @ijuma I don't disagree. However, I think that they are tests that we 
should rather fix vs classifying them as integration tests.
   
   As I said, they can be fixed - but they should be fixed separately versus 
blocking progress towards a useful `unitTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]

2024-02-12 Thread via GitHub


lucasbru merged PR #15275:
URL: https://github.com/apache/kafka/pull/15275


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]

2024-02-12 Thread via GitHub


lianetm commented on code in PR #15275:
URL: https://github.com/apache/kafka/pull/15275#discussion_r1486257781


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1392,4 +1356,16 @@ public void registerStateListener(MemberStateListener 
listener) {
 }
 this.stateUpdatesListeners.add(listener);
 }
+
+/**
+ * If either a new target assignment or new metadata is available that we 
have not yet attempted
+ * to reconcile, and we are currently in state RECONCILING, trigger 
reconciliation.
+ */
+@Override
+public PollResult poll(final long currentTimeMs) {
+if (state == MemberState.RECONCILING) {
+maybeReconcile();

Review Comment:
   Just for the record you are both right, us requesting metadata updates from 
the reconciliation, is only setting a flag in the metadata object indicating 
that it needs a full update, but effectively a single metadata request is going 
to be issued when that flag is on, on the next network client poll. Here btw, 
we just have a related jira for a potential future improvement, regarding 
partial metadata updates when reconciling 
(https://issues.apache.org/jira/browse/KAFKA-15847), since right now we request 
it for all topics, even if we only need the ones involved in the assignment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-02-12 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Description: 
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)


We should review the poll expiration logic that triggers a leave group 
operation. That is currently applied in the HB Manager poll, without any 
validation, and given it depends on the consumer poll timer, it could happen at 
any time, regardless of the state of the member. Ex. poll timer could expire 
when the member is leaving, leading to this leaving->stale invalid transition. 
We should probably consider that this pro-active leave should only apply when 
the consumer is in the group (not leaving, unsubscribed or fatal)

  was:
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.cons

Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-12 Thread via GitHub


dajac commented on PR #15348:
URL: https://github.com/apache/kafka/pull/15348#issuecomment-1938702499

   @OmniaGM @jolshan Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-12 Thread via GitHub


dajac commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1486196044


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -0,0 +1,1485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import java.util.ArrayList;

Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-12 Thread via GitHub


dajac commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1486183604


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -0,0 +1,1485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import java.util.ArrayList;

[jira] [Created] (KAFKA-16244) Move code style exceptions from suppressions.xml to the code

2024-02-12 Thread David Jacot (Jira)
David Jacot created KAFKA-16244:
---

 Summary: Move code style exceptions from suppressions.xml to the 
code
 Key: KAFKA-16244
 URL: https://issues.apache.org/jira/browse/KAFKA-16244
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-12 Thread via GitHub


dajac commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1486181764


##
checkstyle/suppressions.xml:
##
@@ -336,11 +336,11 @@
 
 
+  
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>

Review Comment:
   I was actually not aware of this. I think that it makes sense. I file 
https://issues.apache.org/jira/browse/KAFKA-16244 to give it a try. In this PR, 
I would prefer to keep the `suppressions.xml` exception though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix package name for FetchFromFollowerIntegrationTest [kafka]

2024-02-12 Thread via GitHub


divijvaidya opened a new pull request, #15353:
URL: https://github.com/apache/kafka/pull/15353

   The change will ensure that this test is included when running:
   ```
   ./gradlew :core:unitTest --tests "kafka.server.*"
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-12 Thread via GitHub


divijvaidya commented on PR #15316:
URL: https://github.com/apache/kafka/pull/15316#issuecomment-1938598989

   As part of this last batch, please also removed this test from the list 
here: 
https://github.com/apache/kafka/blob/5cfcc52fb3fce4a43ca77df311382d7a02a40ed2/build.gradle#L424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-16239.
--
Resolution: Fixed

> Clean up references to non-existent IntegrationTestHelper
> -
>
> Key: KAFKA-16239
> URL: https://issues.apache.org/jira/browse/KAFKA-16239
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
> Fix For: 3.8.0
>
>
> A bunch of places in the code javadocs and READ docs refer to a class called 
> IntegrationTestHelper. Such a class does not exist. 
> This task will clean up all referenced to IntegrationTestHelper from Kafka 
> code base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16239:
-
Component/s: unit tests

> Clean up references to non-existent IntegrationTestHelper
> -
>
> Key: KAFKA-16239
> URL: https://issues.apache.org/jira/browse/KAFKA-16239
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
> Fix For: 3.8.0
>
>
> A bunch of places in the code javadocs and READ docs refer to a class called 
> IntegrationTestHelper. Such a class does not exist. 
> This task will clean up all referenced to IntegrationTestHelper from Kafka 
> code base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16239:
-
Fix Version/s: 3.8.0

> Clean up references to non-existent IntegrationTestHelper
> -
>
> Key: KAFKA-16239
> URL: https://issues.apache.org/jira/browse/KAFKA-16239
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
> Fix For: 3.8.0
>
>
> A bunch of places in the code javadocs and READ docs refer to a class called 
> IntegrationTestHelper. Such a class does not exist. 
> This task will clean up all referenced to IntegrationTestHelper from Kafka 
> code base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16239: Clean up references to non-existent IntegrationTestHelper [kafka]

2024-02-12 Thread via GitHub


divijvaidya merged PR #15352:
URL: https://github.com/apache/kafka/pull/15352


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]

2024-02-12 Thread via GitHub


satishd merged PR #15209:
URL: https://github.com/apache/kafka/pull/15209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix syntax error in release.py [kafka]

2024-02-12 Thread via GitHub


divijvaidya merged PR #15350:
URL: https://github.com/apache/kafka/pull/15350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-14041.
--
Resolution: Fixed

> Avoid the keyword var for a variable declaration in ConfigTransformer
> -
>
> Key: KAFKA-14041
> URL: https://issues.apache.org/jira/browse/KAFKA-14041
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: QualiteSys QualiteSys
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.8.0
>
>
> In the file 
> clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a 
> variable named var is declared :
> line 84 : for (ConfigVariable var : vars) {
> Since it is a java keyword, could the variable name be changed ?
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14041:
-
Priority: Minor  (was: Major)

> Avoid the keyword var for a variable declaration in ConfigTransformer
> -
>
> Key: KAFKA-14041
> URL: https://issues.apache.org/jira/browse/KAFKA-14041
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: QualiteSys QualiteSys
>Assignee: Andrew Schofield
>Priority: Minor
> Fix For: 3.8.0
>
>
> In the file 
> clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a 
> variable named var is declared :
> line 84 : for (ConfigVariable var : vars) {
> Since it is a java keyword, could the variable name be changed ?
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer

2024-02-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14041:
-
Fix Version/s: 3.8.0

> Avoid the keyword var for a variable declaration in ConfigTransformer
> -
>
> Key: KAFKA-14041
> URL: https://issues.apache.org/jira/browse/KAFKA-14041
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: QualiteSys QualiteSys
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.8.0
>
>
> In the file 
> clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a 
> variable named var is declared :
> line 84 : for (ConfigVariable var : vars) {
> Since it is a java keyword, could the variable name be changed ?
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14041: Avoid the keyword var for a variable declaration [kafka]

2024-02-12 Thread via GitHub


divijvaidya merged PR #15351:
URL: https://github.com/apache/kafka/pull/15351


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer [kafka]

2024-02-12 Thread via GitHub


lucasbru commented on code in PR #15339:
URL: https://github.com/apache/kafka/pull/15339#discussion_r1485988931


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -95,6 +98,8 @@ public class MembershipManagerImplTest {
 private BlockingQueue backgroundEventQueue;
 private BackgroundEventHandler backgroundEventHandler;
 private Time time;
+private Metrics metrics;
+private RebalanceMetricsManager rebalanceMetricsManager = 
mock(RebalanceMetricsManager.class);

Review Comment:
   In this class, you sometimes mock the rebalancemetricsmanager, and sometimes 
you use the real one. I think it's easier to stick to one of the two, so either 
the metrics manager is part of the unit-under-test or not. You can probably 
just use the "real one" everywhere?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -242,7 +234,7 @@ public void testTransitionToFailedWhenTryingToJoin() {
 MembershipManagerImpl membershipManager = new MembershipManagerImpl(
 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
 subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-backgroundEventHandler, time);
+backgroundEventHandler, time, 
mock(RebalanceMetricsManager.class));

Review Comment:
   why no use your member `rebalanceMetricsManager` here?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -163,7 +155,7 @@ public void 
testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin()
 MembershipManagerImpl manager = new MembershipManagerImpl(
 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, 
Optional.empty(),
 subscriptionState, commitRequestManager, metadata, logContext, 
Optional.empty(),
-backgroundEventHandler, time);
+backgroundEventHandler, time, 
mock(RebalanceMetricsManager.class));

Review Comment:
   why no use your member `rebalanceMetricsManager` here?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -75,6 +75,11 @@ public interface MembershipManager {
  */
 void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData 
response);
 
+/**
+ * Notify the member that an error heartbeat response was received.
+ */
+void onHeartbeatError();

Review Comment:
   I wonder if it would make sense to rename `onHeartbeatResponseReceived` in 
this change, since an error response is also a response that we receive -- it 
should be clear that it's only called in the successful case. Maybe we could 
call the two functions `onHeartbeatSuccess` and `onHeartbeatFailure`, going 
along with the convention in `RequestFutureListener` etc.pp.?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1610,12 +1605,152 @@ public void testHeartbeatSentOnStaledMember() {
 
 @Test
 public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
-MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup(null);
+MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
 assertEquals(MemberState.JOINING, membershipManager.state());
 receiveEmptyAssignment(membershipManager);
 assertEquals(MemberState.STABLE, membershipManager.state());
 }
 
+@Test
+public void testMetricsWhenHeartbeatFailed() {
+rebalanceMetricsManager = new RebalanceMetricsManager(metrics);
+MembershipManagerImpl membershipManager = createMemberInStableState();
+membershipManager.onHeartbeatError();
+
+// Not expecting rebalance failures without assignments being 
reconciled
+assertEquals(0.0d, getMetricValue(metrics, 
rebalanceMetricsManager.rebalanceTotal));
+assertEquals(0.0d, getMetricValue(metrics, 
rebalanceMetricsManager.failedRebalanceTotal));
+}
+
+@Test
+public void testRebalanceMetricsOnSuccessfulRebalance() {
+rebalanceMetricsManager = new RebalanceMetricsManager(metrics);
+MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
+mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
+
+CompletableFuture commitResult = mockRevocationNoCallbacks(true);
+
+receiveEmptyAssignment(membershipManager);
+long reconciliationDurationMs = sleepRandomMs(time, 2000);
+
+// Complete commit request to complete the callback invocation
+   

Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-02-12 Thread via GitHub


nicktelford commented on code in PR #15264:
URL: https://github.com/apache/kafka/pull/15264#discussion_r1485925149


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1836,6 +1842,36 @@ public void 
maybeThrowTaskExceptionsFromProcessingThreads() {
 }
 }
 
+private boolean transactionBuffersExceedCapacity = false;
+
+boolean transactionBuffersExceedCapacity() {
+return transactionBuffersExceedCapacity;
+}
+
+boolean transactionBuffersWillExceedCapacity() {
+final boolean transactionBuffersAreUnbounded = 
maxUncommittedStateBytes < 0;
+if (transactionBuffersAreUnbounded) {
+return false;
+}
+
+// force an early commit if the uncommitted bytes exceeds or is 
*likely to exceed* the configured threshold

Review Comment:
   > First, IIUC, the idea is to use the number of additional uncommitted bytes 
since the last commit as a heuristic to estimate how many more uncommitted 
bytes will be added if we don't commit right now, and thus try to commit "just 
before" we go over the limit.
   
   Correct, this is the intent. One correction though: we're calculating the 
delta since the last _iteration_ of the StreamThread, not since the last commit.
   
   This behaviour was requested on the mailing list, IIRC by @cadonna, to 
reduce the risk of OOM between commits. The thinking was that users will  
expect `statestore.uncommitted.max.bytes` to be an upper-bound, and likely set 
it with little headroom. If the commit interval is fairly long (e.g. 30 seconds 
under ALOS), then there's a good chance we would exceed this limit by a 
considerable margin.
   
   > [...] so we expect to see updates to the same keys. [...] and see fewer 
and fewer new keys vs in-place updates.
   
   Writes in RocksDB are the same size, irrespective of whether they are 
inserts of new keys, or updates to existing keys. The only case I can think of 
where this may be different is when migrating an existing record from a 
non-timestamped to a timestamped column family; in which case, the updates 
would actually be slightly larger, due to the additional tombstone written.
   
   Ultimately, any heuristic we choose will be imperfect. If we track an 
average, even since the last commit, then it would be less responsive to sudden 
increases in throughput, potentially allowing an OOM before the average has had 
a chance to catch up. The current approach attempts to err on the side of 
caution: we may trigger a commit unnecessarily early on occasion, but we should 
be less likely to exceed the configured buffer size.
   
   I'm not against changing this heuristic to a running average, or anything 
else; I'm just not sufficiently convinced it would be better :grin: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >