[jira] [Commented] (KAFKA-17101) Mirror maker internal topics cleanup policy changes to 'delete' from 'compact'

2024-07-10 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864935#comment-17864935
 ] 

kaushik srinivas commented on KAFKA-17101:
--

[~gharris1727] Below is the configuration. 

site1.ssl.keystore.filename=/etc/mirror-maker/secrets/ssl/site1/server.jks
site1.ssl.truststore.filename=/etc/mirror-maker/secrets/ssl/site1/trustchain.jks
site2.security.protocol=SSL
site1->site2.heartbeats.topic.replication.factor=3
site1.ssl.enabled.protocols=TLSv1.2,TLSv1.3
syslog=false
site2.ssl.keystore.filename=/etc/mirror-maker/secrets/ssl/site2/server.jks
site2.consumer.ssl.cipher.suites=
site1->site2.sync.topic.configs.interval.seconds=300
site1->site2.topics=.*_ALARM$,.*_INTERNAL_Intent_Changes$,product_INTERNAL_HAM_UPDATE$
site1->site2.replication.factor=3
site1->site2.emit.checkpoints.enabled=true
site2.ssl.key.password=productkeystore
tasks.max=1
site1.ssl.keystore.password=productkeystore
site1.ssl.truststore.location=/etc/kafka/shared/site1_truststore
site1.status.storage.replication.factor=3
site1.ssl.truststore.password=productkeystore
site1->site2.sync.topic.acls.enabled=false
site1->site2.refresh.topics.interval.seconds=300
site2.ssl.truststore.location=/etc/kafka/shared/site2_truststore
site2.ssl.truststore.filename=/etc/mirror-maker/secrets/ssl/site2/trustchain.jks
site1.ssl.protocol=TLSv1.2
site1->site2.replication.policy.class=RetainTopicNameReplicationPolicy
site1->site2.groups=.*
site1.security.protocol=SSL
site1->site2.groups.blacklist=console-consumer-.*, connect-.*, __.*
site1.config.storage.replication.factor=3
site1->site2.emit.hearbeats.enabled=true
site2.ssl.truststore.password=productkeystore
site1->site2.offset-syncs.topic.replication.factor=3
site1.ssl.keystore.location=/etc/kafka/shared/site1_keystore
site1.offset.storage.replication.factor=3
clusters=site1,site2
site1.bootstrap.servers=product-kafka-headless:9092
site2.ssl.protocol=TLSv1.2
site1->site2.refresh.groups.interval.seconds=300
site2.ssl.enabled=true
site2.ssl.enabled.protocols=TLSv1.2,TLSv1.3
site2.ssl.endpoint.identification.algorithm=
site1.ssl.cipher.suites=
site1->site2.checkpoints.topic.replication.factor=3
site1.producer.ssl.cipher.suites=
site2.ssl.keystore.location=/etc/kafka/shared/site2_keystore
site2.config.storage.replication.factor=3
site2.status.storage.replication.factor=3
site1.ssl.enabled=true
site1->site2.enabled=true
site1.ssl.endpoint.identification.algorithm=
site1.admin.ssl.cipher.suites=
site2.producer.ssl.cipher.suites=
site2.ssl.keystore.password=productkeystore
site2.ssl.cipher.suites=
site2.offset.storage.replication.factor=3
site1.ssl.key.password=productkeystore
site1.consumer.ssl.cipher.suites=
site2.bootstrap.servers=product-kafka-headless:9097
site1->site2.sync.topic.configs.enabled=true
site2.admin.ssl.cipher.suites=

> Mirror maker internal topics cleanup policy changes to 'delete' from 
> 'compact' 
> ---
>
> Key: KAFKA-17101
> URL: https://issues.apache.org/jira/browse/KAFKA-17101
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.5.1, 3.6.1
>Reporter: kaushik srinivas
>Priority: Major
>
> Scenario/Setup details
> Kafka cluster 1: 3 replicas
> Kafka cluster 2: 3 replicas
> MM1 moving data from cluster 1 to cluster 2
> MM2 moving data from cluster 2 to cluster 1
> Sometimes with a reboot of the kafka cluster 1 and MM1 instance, we observe 
> MM failing to come up with below exception,
> {code:java}
> {"message":"DistributedHerder-connect-1-1 - 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker 
> clientId=connect-1, groupId=site1-mm2] Uncaught exception in herder work 
> thread, exiting: "}}
> org.apache.kafka.common.config.ConfigException: Topic 
> 'mm2-offsets.site1.internal' supplied via the 'offset.storage.topic' property 
> is required to have 'cleanup.policy=compact' to guarantee consistency and 
> durability of source connector offsets, but found the topic currently has 
> 'cleanup.policy=delete'. Continuing would likely result in eventually losing 
> source connector offsets and problems restarting this Connect cluster in the 
> future. Change the 'offset.storage.topic' property in the Connect worker 
> configurations to use a topic with 'cleanup.policy=compact'. {code}
> Once the topic is altered with cleanup policy of compact. MM works just fine.
> This is happening on our setups sporadically and across varieties of 
> scenarios. Not been successful in identifying the exact reproduction steps as 
> of now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]

2024-07-10 Thread via GitHub


vbalani002 commented on PR #16565:
URL: https://github.com/apache/kafka/pull/16565#issuecomment-021575

   Thanks for looking into this promptly, @gharris1727!
   I was waiting for a green CI build before marking the PR as ready. Since 
there are no test failures in the pipeline, I'm opening it for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-6197) Difficult to get to the Kafka Streams javadocs

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6197:
---
Labels: beginner newbie  (was: )

> Difficult to get to the Kafka Streams javadocs
> --
>
> Key: KAFKA-6197
> URL: https://issues.apache.org/jira/browse/KAFKA-6197
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 0.11.0.2, 1.0.0
>Reporter: James Cheng
>Priority: Major
>  Labels: beginner, newbie
>
> In order to get to the javadocs for the Kafka producer/consumer/streams, I 
> typically go to http://kafka.apache.org/documentation/ and click on either 
> 2.1 2.2 or 2.3 in the table of contents to go right to appropriate section.
> The link for "Streams API" now goes to the (very nice) 
> http://kafka.apache.org/10/documentation/streams/. That page doesn't have a 
> direct link to the Javadocs anywhere. The examples and guides actually 
> frequently mention "See javadocs for details" but there are no direct links 
> to it.
> If I instead go back to the main page and scroll directly to section 2.3, 
> there is still the link to get to the javadocs. But it's harder to jump 
> immediately to it. And it's a little confusing that section 2.3 in the table 
> of contents does not link you to section 2.3 of the page.
> It would be nice if the link to the Streams javadocs was easier to get to.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16661: Added a lower `log.initial.task.delay.ms` value [kafka]

2024-07-10 Thread via GitHub


showuon commented on PR #16221:
URL: https://github.com/apache/kafka/pull/16221#issuecomment-2221970784

   Retriggering CI: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16221/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-8629) Kafka Streams Apps to support small native images through GraalVM

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8629.

Resolution: Fixed

GraalVM should work with KS now.

> Kafka Streams Apps to support small native images through GraalVM
> -
>
> Key: KAFKA-8629
> URL: https://issues.apache.org/jira/browse/KAFKA-8629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
> Environment: OSX
> Linux on Docker
>Reporter: Andy Muir
>Assignee: Andy Muir
>Priority: Minor
>
> I'm investigating using [GraalVM|http://example.com/] to help with reducing 
> docker image size and required resources for a simple Kafka Streams 
> microservice. To this end, I'm looking at running a microservice which:
> 1) consumes from a Kafka topic (XML)
> 2) Transforms into JSON
> 3) Produces to a new Kafka topic.
> The Kafka Streams app running in the JVM works fine.
> When I attempt to build it to a GraalVM native image (binary executable which 
> does not require the JVM, hence smaller image size and less resources), I 
> encountered a few 
> [incompatibilities|https://github.com/oracle/graal/blob/master/substratevm/LIMITATIONS.md]
>  with the source code in Kafka.
> I've implemented a workaround for each of these in a fork (link to follow) to 
> help establish if it is feasible. I don't intend (at this stage) for the 
> changes to be applied to the broker - I'm only after Kafka Streams for now. 
> I'm not sure whether it'd be a good idea for the broker itself to run as a 
> native image!
> There were 2 issues getting the native image with kafka streams:
> 1) Some Reflection use cases using MethodHandle
> 2) Anything JMX
> To work around these issues, I have:
> 1) Replaced use of MethodHandle with alternatives
> 2) Commented out the JMX code in a few places
> While the first may be sustainable, I'd expect that the 2nd option should be 
> put behind a configuration switch to allow the existing code to be used by 
> default and turning off JMX if configured.
> *I haven't created a PR for now, as I'd like feedback to decide if it is 
> going to be feasible to carry this forwards.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9738) Add Generics Type Parameters to forwarded() in MockProcessorContext

2024-07-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864883#comment-17864883
 ] 

Matthias J. Sax commented on KAFKA-9738:


[~cadonna] – I believe this is fixed with the new `api.MockProcessorContext`? 
Can we close this ticket?

> Add Generics Type Parameters to forwarded() in MockProcessorContext 
> 
>
> Key: KAFKA-9738
> URL: https://issues.apache.org/jira/browse/KAFKA-9738
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Minor
>
> The method {{forwarded()}} to capture the forwarded records in 
> {{MockProcessorContext}} does not have any type parameters although the 
> corresponding {{forward()}} does have them. To enable type checking at 
> compile time in tests, generics parameters shall be added to the 
> {{forwarded()}} method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13326) Add multi-cluster support to Kafka Streams

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13326.
-
Resolution: Duplicate

> Add multi-cluster support to Kafka Streams
> --
>
> Key: KAFKA-13326
> URL: https://issues.apache.org/jira/browse/KAFKA-13326
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guangyuan Wang
>Priority: Major
>  Labels: needs-kip
>
> Dear Kafka Team,
> According to the link, 
> https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html#bootstrap-servers.
> Kafka Streams applications can only communicate with a single Kafka cluster 
> specified by this config value. Future versions of Kafka Streams will support 
> connecting to different Kafka clusters for reading input streams and writing 
> output streams.
> Which version will this feature be added in the Kafka stream?  This is really 
> a very good feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10356) Handle accidental deletion of sink-topics as exceptional failure

2024-07-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864879#comment-17864879
 ] 

Matthias J. Sax commented on KAFKA-10356:
-

Is the description of this ticket still correct? Based on 
https://issues.apache.org/jira/browse/KAFKA-16508 which we addressed recently, 
we original ended up with an infinite retry loop (at least in newer releases), 
and did not call the ProductionExceptionHandler.

The new behavior with K16508 is to call the handler and allow user to drop the 
record on the floor. However, I would not consider this as a "lost silently" 
case, because the users' custom handler did make this decision.

I don't think that triggering a rebalance and shutting down if a sink topic 
does not exist is the right thing any longer – maybe it was a good idea back in 
the day, for which we did not have "dynamic routing" feature. But with "dynamic 
routing", we should just rely on the production exception handler IMHO, and 
allow to drop a potential poison pill record.

Related is https://issues.apache.org/jira/browse/KAFKA-17057 proposing to a 
RETRY to the handler.

Any objections to just close this ticket?

> Handle accidental deletion of sink-topics as exceptional failure
> 
>
> Key: KAFKA-10356
> URL: https://issues.apache.org/jira/browse/KAFKA-10356
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Today when sink topics are deleted, the producer's send callback would 
> eventually return the UnknownTopicOrPartition exception after configured 
> max.delivery.ms whose default is 2min if EOS is not enabled (otherwise its 
> Integer.MAX_VALUE). Then in Streams implementation the exception would be 
> handled by ProductionExceptionHandler which by default would treat it as 
> `FAIL` and would cause the thread to die. If it treat it is CONTINUE then it 
> would be silently ignored and the sent records are lost silently. 
> We should improve this situation in Streams by special-handling 
> {{UnknownTopicOrPartition}} exception and trigger a rebalance as well, and 
> then in leader we can also check if the sink topic metadata exists, just like 
> source topic, and then follow the same logic as in 
> https://issues.apache.org/jira/browse/KAFKA-10355.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: add clean up command to tests README [kafka]

2024-07-10 Thread via GitHub


gongxuanzhang commented on PR #16560:
URL: https://github.com/apache/kafka/pull/16560#issuecomment-2221896315

   I update it 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add clean up command to tests README [kafka]

2024-07-10 Thread via GitHub


gongxuanzhang commented on code in PR #16560:
URL: https://github.com/apache/kafka/pull/16560#discussion_r1673328940


##
tests/README.md:
##
@@ -47,6 +47,10 @@ 
TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeT
 ```
 bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh
 ```
+* Clean Docker Image
+```
+docker rm -f $(docker ps -aq --filter "name=ducker")

Review Comment:
   you are right,this is even better!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17096:Fix kafka_log4j_appender.py [kafka]

2024-07-10 Thread via GitHub


gongxuanzhang commented on PR #16559:
URL: https://github.com/apache/kafka/pull/16559#issuecomment-2221884905

   I update it, please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Description: 
*env*

java:jdk1.8.0_271

os:centos 7

kafka: 2.41 and 2.8.2

ranger:2.1.0

 

 

After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.

 

There ara some jpg of my policy and the result of commands

 

!image-2024-07-11-10-33-51-587.png!

 

!image-2024-07-11-10-35-10-358.png!

 

!image-2024-07-11-10-34-15-910.png!

  was:
After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.

 

There ara some jpg of my policy and the result of commands

 

!image-2024-07-11-10-33-51-587.png!

 

!image-2024-07-11-10-35-10-358.png!

 

!image-2024-07-11-10-34-15-910.png!


> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png
>
>
> *env*
> java:jdk1.8.0_271
> os:centos 7
> kafka: 2.41 and 2.8.2
> ranger:2.1.0
>  
>  
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.
>  
> There ara some jpg of my policy and the result of commands
>  
> !image-2024-07-11-10-33-51-587.png!
>  
> !image-2024-07-11-10-35-10-358.png!
>  
> !image-2024-07-11-10-34-15-910.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Attachment: image-2024-07-11-10-35-10-358.png

> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png
>
>
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.
>  
> There ara some jpg of my policy and the result of commands
>  
> !image-2024-07-11-10-33-51-587.png!
>  
>  
>  
> !image-2024-07-11-10-34-15-910.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Description: 
After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.

 

There ara some jpg of my policy and the result of commands

 

!image-2024-07-11-10-33-51-587.png!

 

!image-2024-07-11-10-35-10-358.png!

 

!image-2024-07-11-10-34-15-910.png!

  was:
After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.

 

There ara some jpg of my policy and the result of commands

 

!image-2024-07-11-10-33-51-587.png!

 

 

 

!image-2024-07-11-10-34-15-910.png!


> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png
>
>
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.
>  
> There ara some jpg of my policy and the result of commands
>  
> !image-2024-07-11-10-33-51-587.png!
>  
> !image-2024-07-11-10-35-10-358.png!
>  
> !image-2024-07-11-10-34-15-910.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Attachment: image-2024-07-11-10-33-51-587.png

> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png
>
>
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Attachment: image-2024-07-11-10-34-15-910.png

> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png
>
>
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StarBoy1005 updated KAFKA-17119:

Description: 
After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.

 

There ara some jpg of my policy and the result of commands

 

!image-2024-07-11-10-33-51-587.png!

 

 

 

!image-2024-07-11-10-34-15-910.png!

  was:
After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.


> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> 
>
> Key: KAFKA-17119
> URL: https://issues.apache.org/jira/browse/KAFKA-17119
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: StarBoy1005
>Priority: Major
> Attachments: image-2024-07-11-10-33-51-587.png, 
> image-2024-07-11-10-34-15-910.png
>
>
> After enabled kafka-ranger-plugin and baned the user for using describe in 
> policy, but that user still can use describe.
> What's more, not even describe, but list. event the command of create topic 
> is abnormal.
>  
> There ara some jpg of my policy and the result of commands
>  
> !image-2024-07-11-10-33-51-587.png!
>  
>  
>  
> !image-2024-07-11-10-34-15-910.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.

2024-07-10 Thread StarBoy1005 (Jira)
StarBoy1005 created KAFKA-17119:
---

 Summary: After enabled kafka-ranger-plugin and baned the user for 
using describe in policy, but that user still can use describe.
 Key: KAFKA-17119
 URL: https://issues.apache.org/jira/browse/KAFKA-17119
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.4.0
Reporter: StarBoy1005


After enabled kafka-ranger-plugin and baned the user for using describe in 
policy, but that user still can use describe.

What's more, not even describe, but list. event the command of create topic is 
abnormal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16855) KRaft - Wire replaying a TopicRecord

2024-07-10 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864871#comment-17864871
 ] 

Luke Chen commented on KAFKA-16855:
---

[~muralibasani] , the [PR|https://github.com/apache/kafka/pull/16502] for 
KAFKA-16853 is opened and got first round of review. I think it's in a good 
shape and you can start to work on this ticket based on that branch when you 
have time. FYI.

> KRaft - Wire replaying a TopicRecord
> 
>
> Key: KAFKA-16855
> URL: https://issues.apache.org/jira/browse/KAFKA-16855
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> *Summary*
> Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to 
> interact with the two thread pools in the RemoteLogManager to add/remove the 
> correct tasks from each



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-17115:
--
Description: 
When a dynamic consumer (i.e., one with no group instance ID configured) first 
tries to join a group, the group coordinator normally responds with the 
MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon 
after. During this step, the group coordinator will also generate a new member 
ID for the consumer, include it in the error response for the initial join 
group request, and expect that a member with that ID will participate in future 
rebalances.

If a consumer is closed in between the time that it sends the JoinGroup request 
and the time that it receives the response from the group coordinator, it will 
not attempt to leave the group, since it doesn't have a member ID to include in 
that request.

This will cause future rebalances to hang, since the group coordinator will 
still expect a member with the ID for the now-closed consumer to join. 
Eventually, the group coordinator may remove the closed consumer from the 
group, but with default configuration settings, this could take as long as five 
minutes.

One possible fix is to send a LeaveGroup response with the member ID if the 
consumer receives a JoinGroup response with a member ID after it has been 
closed.

This ticket applies only to the legacy consumer. There is a similar issue with 
the new consumer that is tracked separately in KAFKA-17116.

  was:
When a dynamic consumer (i.e., one with no group instance ID configured) first 
tries to join a group, the group coordinator normally responds with the 
MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon 
after. During this step, the group coordinator will also generate a new member 
ID for the consumer, include it in the error response for the initial join 
group request, and expect that a member with that ID will participate in future 
rebalances.

If a consumer is closed in between the time that it sends the JoinGroup request 
and the time that it receives the response from the group coordinator, it will 
not attempt to leave the group, since it doesn't have a member ID to include in 
that request.

This will cause future rebalances to hang, since the group coordinator will 
still expect a member with the ID for the now-closed consumer to join. 
Eventually, the group coordinator may remove the closed consumer from the 
group, but with default configuration settings, this could take as long as five 
minutes.

One possible fix is to send a LeaveGroup response with the member ID if the 
consumer receives a JoinGroup response with a member ID after it has been 
closed.

 

This applies to the legacy consumer; I have not verified yet with the new async 
consumer.


> Closing newly-created consumers during rebalance can cause rebalances to hang
> -
>
> Key: KAFKA-17115
> URL: https://issues.apache.org/jira/browse/KAFKA-17115
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.9.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a dynamic consumer (i.e., one with no group instance ID configured) 
> first tries to join a group, the group coordinator normally responds with the 
> MEMBER_ID_REQUIRED error, under the assumption that the member will retry 
> soon after. During this step, the group coordinator will also generate a new 
> member ID for the consumer, include it in the error response for the initial 
> join group request, and expect that a member with that ID will participate in 
> future rebalances.
> If a consumer is closed in between the time that it sends the JoinGroup 
> request and the time that it receives the response from the group 
> coordinator, it will not attempt to leave the group, since it doesn't have a 
> member ID to include in that request.
> This will cause future rebalances to hang, since the group coordinator will 
> still expect a member with the ID for the now-closed consumer to join. 
> Eventually, the group coordinator may remove the closed consumer from the 
> group, but with default configuration settings, this could take as long as 
> five minutes.
> One possible fix is to send a LeaveGroup response with the member ID if the 
> consumer receives a JoinGroup response with a member ID after it has been 
> closed.
> This ticket applies only to the legacy consumer. There is a similar issue 
> with the new consumer that is tracked separately in KAFKA-17116.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17115) Closing newly-created legacy consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-17115:
--
Summary: Closing newly-created legacy consumers during rebalance can cause 
rebalances to hang  (was: Closing newly-created consumers during rebalance can 
cause rebalances to hang)

> Closing newly-created legacy consumers during rebalance can cause rebalances 
> to hang
> 
>
> Key: KAFKA-17115
> URL: https://issues.apache.org/jira/browse/KAFKA-17115
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.9.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a dynamic consumer (i.e., one with no group instance ID configured) 
> first tries to join a group, the group coordinator normally responds with the 
> MEMBER_ID_REQUIRED error, under the assumption that the member will retry 
> soon after. During this step, the group coordinator will also generate a new 
> member ID for the consumer, include it in the error response for the initial 
> join group request, and expect that a member with that ID will participate in 
> future rebalances.
> If a consumer is closed in between the time that it sends the JoinGroup 
> request and the time that it receives the response from the group 
> coordinator, it will not attempt to leave the group, since it doesn't have a 
> member ID to include in that request.
> This will cause future rebalances to hang, since the group coordinator will 
> still expect a member with the ID for the now-closed consumer to join. 
> Eventually, the group coordinator may remove the closed consumer from the 
> group, but with default configuration settings, this could take as long as 
> five minutes.
> One possible fix is to send a LeaveGroup response with the member ID if the 
> consumer receives a JoinGroup response with a member ID after it has been 
> closed.
> This ticket applies only to the legacy consumer. There is a similar issue 
> with the new consumer that is tracked separately in KAFKA-17116.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864870#comment-17864870
 ] 

Chris Egerton commented on KAFKA-17115:
---

Damn, that was fast! Thanks for the follow-up [~lianetm], glad this helped with 
development of the new consumer.

> Closing newly-created consumers during rebalance can cause rebalances to hang
> -
>
> Key: KAFKA-17115
> URL: https://issues.apache.org/jira/browse/KAFKA-17115
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.9.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a dynamic consumer (i.e., one with no group instance ID configured) 
> first tries to join a group, the group coordinator normally responds with the 
> MEMBER_ID_REQUIRED error, under the assumption that the member will retry 
> soon after. During this step, the group coordinator will also generate a new 
> member ID for the consumer, include it in the error response for the initial 
> join group request, and expect that a member with that ID will participate in 
> future rebalances.
> If a consumer is closed in between the time that it sends the JoinGroup 
> request and the time that it receives the response from the group 
> coordinator, it will not attempt to leave the group, since it doesn't have a 
> member ID to include in that request.
> This will cause future rebalances to hang, since the group coordinator will 
> still expect a member with the ID for the now-closed consumer to join. 
> Eventually, the group coordinator may remove the closed consumer from the 
> group, but with default configuration settings, this could take as long as 
> five minutes.
> One possible fix is to send a LeaveGroup response with the member ID if the 
> consumer receives a JoinGroup response with a member ID after it has been 
> closed.
>  
> This applies to the legacy consumer; I have not verified yet with the new 
> async consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics

2024-07-10 Thread mutu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mutu updated KAFKA-16412:
-
Affects Version/s: 3.6.0

> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2, 3.6.0
>Reporter: mutu
>Priority: Major
> Attachments: AnotherClientOutput.txt, client1-3.6.png, 
> client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, 
> system1-3.6.log, system2-3.6.log, system3-3.6.log
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
> The root cause may be that topic is registered in zookeeper, but the data is 
> not persisted and topic is not transfer to the paritition. At this time, 
> another client sends the same topic creation request that check the status of 
> zookeeper. After finding the znode of topic, the creation failed.
> System logs are attached.
> Are there any comments to figure out this issues? I will very appreciate them.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16412) Uncreated topics is considered as created topics

2024-07-10 Thread mutu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864869#comment-17864869
 ] 

mutu commented on KAFKA-16412:
--

[~chia7712]  hi, we have added system logs in 3.6.0. Could you take a look?

> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: mutu
>Priority: Major
> Attachments: AnotherClientOutput.txt, client1-3.6.png, 
> client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, 
> system1-3.6.log, system2-3.6.log, system3-3.6.log
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
> The root cause may be that topic is registered in zookeeper, but the data is 
> not persisted and topic is not transfer to the paritition. At this time, 
> another client sends the same topic creation request that check the status of 
> zookeeper. After finding the znode of topic, the creation failed.
> System logs are attached.
> Are there any comments to figure out this issues? I will very appreciate them.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-10 Thread Ao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ao Li updated KAFKA-17112:
--
Component/s: unit tests

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15595) Session window aggregate drops records headers

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15595:
---

Assignee: (was: Hao Li)

> Session window aggregate drops records headers
> --
>
> Key: KAFKA-15595
> URL: https://issues.apache.org/jira/browse/KAFKA-15595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Abdullah alkhawatrah
>Priority: Major
>
> Hey,
> While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
> aggregate behaviour, it seems now that custom headers added before the 
> aggregate are dropped.
> I could reproduce the behaviour with the following test topology:
> {code:java}
> // code placeholder
> final StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(EARLIEST))
> .process(() -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(final ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> record.headers().add("key1", 
> record.value().toString().getBytes());
> context.forward(record);
> }
> })
> .groupByKey()
> 
> .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
> Duration.ofDays(1L)))
> .aggregate(() -> 1,
> (key, value, aggregate) -> aggregate,
> (aggKey, aggOne, aggTwo) -> aggTwo)
> .toStream()
> .map((key, value) -> new KeyValue<>(key.key(), value))
> .to(outputTopic); {code}
> Checking evens in the `outputTopic` show that the headers are empty. With 
> 3.2.* the same topology would have propagated the headers.
>  
> I can see here: 
> [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205]
>  that now a new record is created ignoring the headers, while in 3.2.2, the 
> same record was forwarded after changing the key and value while keeping the 
> headers: 
> [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-13183:
-

> Dropping nul key/value records upstream to repartiton topic not tracked via 
> metrics
> ---
>
> Key: KAFKA-13183
> URL: https://issues.apache.org/jira/browse/KAFKA-13183
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> For joins and aggregation, we consider records with null key or value as 
> invalid, and drop them. Inside the aggregate and join processors, we record 
> dropped record with a corresponding metric (cf `droppedRecrodSensor`).
> However, we also apply an upstream optimization if we need to repartition 
> data. As we know that the downstream aggregation / join will drop those 
> records anyway, we drop them _before_ we write them into the repartition 
> topic (we still need the drop logic in the processor for the case we don't 
> have a repartition topic).
> We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) 
> upstream but this filter does not update the corresponding metric to record 
> dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13183:
---

Assignee: (was: Alex Sorokoumov)

> Dropping nul key/value records upstream to repartiton topic not tracked via 
> metrics
> ---
>
> Key: KAFKA-13183
> URL: https://issues.apache.org/jira/browse/KAFKA-13183
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> For joins and aggregation, we consider records with null key or value as 
> invalid, and drop them. Inside the aggregate and join processors, we record 
> dropped record with a corresponding metric (cf `droppedRecrodSensor`).
> However, we also apply an upstream optimization if we need to repartition 
> data. As we know that the downstream aggregation / join will drop those 
> records anyway, we drop them _before_ we write them into the repartition 
> topic (we still need the drop logic in the processor for the case we don't 
> have a repartition topic).
> We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) 
> upstream but this filter does not update the corresponding metric to record 
> dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13183.
-
Resolution: Fixed

> Dropping nul key/value records upstream to repartiton topic not tracked via 
> metrics
> ---
>
> Key: KAFKA-13183
> URL: https://issues.apache.org/jira/browse/KAFKA-13183
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> For joins and aggregation, we consider records with null key or value as 
> invalid, and drop them. Inside the aggregate and join processors, we record 
> dropped record with a corresponding metric (cf `droppedRecrodSensor`).
> However, we also apply an upstream optimization if we need to repartition 
> data. As we know that the downstream aggregation / join will drop those 
> records anyway, we drop them _before_ we write them into the repartition 
> topic (we still need the drop logic in the processor for the case we don't 
> have a repartition topic).
> We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) 
> upstream but this filter does not update the corresponding metric to record 
> dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics

2024-07-10 Thread mutu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mutu updated KAFKA-16412:
-
Description: 
A client sends topic creation request to broker.

Another client sends the same topic creation request to broker.

The former request does not finish. However, the second client get 
TopicExistsException.

The root cause may be that topic is registered in zookeeper, but the data is 
not persisted and topic is not transfer to the paritition. At this time, 
another client sends the same topic creation request that check the status of 
zookeeper. After finding the znode of topic, the creation failed.

System logs are attached.

Are there any comments to figure out this issues? I will very appreciate them.

 

  was:
A client sends topic creation request to broker.

Another client sends the same topic creation request to broker.

The former request does not finish. However, the second client get 
TopicExistsException.

The root cause may be that topic is registered in zookeeper, but the data is 
not persisted and topic is not transfer to the paritition. At this time, 
another client sends the same topic creation request that check the status of 
zookeeper. After finding the znode of topic, the creation failed.

 


> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: mutu
>Priority: Major
> Attachments: AnotherClientOutput.txt, client1-3.6.png, 
> client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, 
> system1-3.6.log, system2-3.6.log, system3-3.6.log
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
> The root cause may be that topic is registered in zookeeper, but the data is 
> not persisted and topic is not transfer to the paritition. At this time, 
> another client sends the same topic creation request that check the status of 
> zookeeper. After finding the znode of topic, the creation failed.
> System logs are attached.
> Are there any comments to figure out this issues? I will very appreciate them.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics

2024-07-10 Thread mutu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mutu updated KAFKA-16412:
-
Description: 
A client sends topic creation request to broker.

Another client sends the same topic creation request to broker.

The former request does not finish. However, the second client get 
TopicExistsException.

The root cause may be that topic is registered in zookeeper, but the data is 
not persisted and topic is not transfer to the paritition. At this time, 
another client sends the same topic creation request that check the status of 
zookeeper. After finding the znode of topic, the creation failed.

 

  was:
A client sends topic creation request to broker.

Another client sends the same topic creation request to broker.

The former request does not finish. However, the second client get 
TopicExistsException.

 

 


> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: mutu
>Priority: Major
> Attachments: AnotherClientOutput.txt, client1-3.6.png, 
> client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, 
> system1-3.6.log, system2-3.6.log, system3-3.6.log
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
> The root cause may be that topic is registered in zookeeper, but the data is 
> not persisted and topic is not transfer to the paritition. At this time, 
> another client sends the same topic creation request that check the status of 
> zookeeper. After finding the znode of topic, the creation failed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics

2024-07-10 Thread mutu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mutu updated KAFKA-16412:
-
Attachment: client1-3.6.png
client2-3.6.png
system1-3.6.log
system2-3.6.log
system3-3.6.log

> Uncreated topics is considered as created topics
> 
>
> Key: KAFKA-16412
> URL: https://issues.apache.org/jira/browse/KAFKA-16412
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.2
>Reporter: mutu
>Priority: Major
> Attachments: AnotherClientOutput.txt, client1-3.6.png, 
> client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, 
> system1-3.6.log, system2-3.6.log, system3-3.6.log
>
>
> A client sends topic creation request to broker.
> Another client sends the same topic creation request to broker.
> The former request does not finish. However, the second client get 
> TopicExistsException.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17055) Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest

2024-07-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864849#comment-17864849
 ] 

José Armando García Sancio commented on KAFKA-17055:


[~masonyc] interesting. Jira says that the user cannot be assigned issues. 
Let's leave it unassigned I guess.

> Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest
> -
>
> Key: KAFKA-17055
> URL: https://issues.apache.org/jira/browse/KAFKA-17055
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest use 
> well known ids like 0, 1, etc. Because of this those tests were not able to 
> catch a bug in the BeginQuorumEpoch schema were the default value for VoterId 
> was 0 instead of -1.
> Improve those tests by using random positive numbers to lower the probability 
> that they will match the default value of a schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17118) Remove StorageTool#buildMetadataProperties

2024-07-10 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864847#comment-17864847
 ] 

kangning.li commented on KAFKA-17118:
-

Hey [~chia7712] , I'd Like to look into this. Can you assign this to me?

> Remove StorageTool#buildMetadataProperties
> --
>
> Key: KAFKA-17118
> URL: https://issues.apache.org/jira/browse/KAFKA-17118
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> It is useless in production after 
> https://github.com/apache/kafka/commit/7060c08d6f9b0408e7f40a90499caf2e636fac61



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]

2024-07-10 Thread via GitHub


jolshan commented on code in PR #16183:
URL: https://github.com/apache/kafka/pull/16183#discussion_r1673242703


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -217,8 +217,11 @@ public enum MetadataVersion {
 // Add ELR related supports (KIP-966).
 IBP_3_9_IV1(22, "3.9", "IV1", true),
 
+IBP_3_9_IV2(23, "3.9", "IV2", false),

Review Comment:
   Sounds good. I also need to make a few other changes when I rebase, so be on 
the lookout for more soon :) 
   
   I planned to make the update where we don't send features as part of the 
registration/api versions if the version range is 0-0. I need to do this 
because Colin's fix only changes the min version. It is safe because a version 
range 0-0 is essentially the same as not supporting the feature. We will have 
it here until the MV is marked as production ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17114:

Component/s: unit tests

> DefaultStateUpdater::handleRuntimeException should update isRunning before 
> calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
> -
>
> Key: KAFKA-17114
> URL: https://issues.apache.org/jira/browse/KAFKA-17114
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> I saw a flaky test in 
> DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId 
> recently.
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
> To make the bug more reproducible, you may add `Thread.sleep(5)` after 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  in  DefaultStateUpdater::handleRuntimeException
> The test is flaky because 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` 
> statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. 
> If `assertFalse(stateUpdater.isRunning());` is executed before 
> `isRunning.set(false);` the test will fail



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17113) Flaky Test in GlobalStreamThreadTest#shouldThrowStreamsExceptionOnStartupIfExceptionOccurred

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17113:

Component/s: unit tests

> Flaky Test in 
> GlobalStreamThreadTest#shouldThrowStreamsExceptionOnStartupIfExceptionOccurred
> 
>
> Key: KAFKA-17113
> URL: https://issues.apache.org/jira/browse/KAFKA-17113
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> The `shouldThrowStreamsExceptionOnStartupIfExceptionOccurred` test expects `
> globalStreamThread.start` throws `startupException` when startup fails. This 
> may not be true in some slow machines. 
>  
> {code:java}
> class GlobalStreamThread {
>   Exception startupException;
>   void initialize() {
> try {
>   ...
> } catch (Exception e) {
>   startupException = e;
> }
> ...
> setState(State.DEAD);
>   }
>   void start() {
> super.start();
> while (stillInitializing()) {
>   Utils.sleep(1);
>   if (startupException != null) {
> throw startupexception;
>   }
> }
> if (inErrorState()) {
>   throw new IllegalStateException("Initialization for the global stream 
> thread failed");
> }
>   }
> }
> {code}
> Consider the following schedule:
> {code}
> main:start:`startupException != null`
> GlobalStreamThread:initialize:`startupException = e;`
> GlobalStreamThread:initialize:`setState(State.DEAD);`
> main:start:`inErrorState()`
> main:start:`throw new IllegalStateException`
> {code}
>  
> The function throws `IllegalStateException("Initialization for the global 
> stream thread failed")` instead of `startupexception`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-07-10 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864841#comment-17864841
 ] 

TengYao Chi commented on KAFKA-17116:
-

Hi [~lianetm] 

I would like to investigate this issue, may i have this one?

Many thanks!

> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-07-10 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1673195870


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   I will check again and try to remove .
   Thanks for your review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-07-10 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1673195870


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   I will check again.
   Thanks for your review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-07-10 Thread via GitHub


TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1673194862


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -706,4 +706,9 @@ public static ApiVersionsResponse createApiVersionsResponse(
 ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
 zkMigrationEnabled);
 }
+
+public static String getCurrentFunctionName() {

Review Comment:
   Sorry, I missed the newset commend.
   Will do ASAP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17055) Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest

2024-07-10 Thread Mason C (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864838#comment-17864838
 ] 

Mason C commented on KAFKA-17055:
-

Hi [~jsancio] , could you please assign it to me? Seems like I don't have 
access to assign to myself.

> Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest
> -
>
> Key: KAFKA-17055
> URL: https://issues.apache.org/jira/browse/KAFKA-17055
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest use 
> well known ids like 0, 1, etc. Because of this those tests were not able to 
> catch a bug in the BeginQuorumEpoch schema were the default value for VoterId 
> was 0 instead of -1.
> Improve those tests by using random positive numbers to lower the probability 
> that they will match the default value of a schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]

2024-07-10 Thread via GitHub


junrao commented on code in PR #16183:
URL: https://github.com/apache/kafka/pull/16183#discussion_r1673162688


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -217,8 +217,11 @@ public enum MetadataVersion {
 // Add ELR related supports (KIP-966).
 IBP_3_9_IV1(22, "3.9", "IV1", true),
 
+IBP_3_9_IV2(23, "3.9", "IV2", false),

Review Comment:
   Could we add a comment that this is for bootstrapping TV?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16260: Deprecate window.size.ms in StreamsConfig.java and index.html [kafka]

2024-07-10 Thread via GitHub


mjsax commented on code in PR #16391:
URL: https://github.com/apache/kafka/pull/16391#discussion_r1673180484


##
docs/streams/developer-guide/config-streams.html:
##
@@ -689,7 +689,7 @@ default.value.serde
-  default.windowed.key.serde.inner (Deprecated.)
+  default.windowed.key.serde.inner(Deprecated.)

Review Comment:
   Why remove the space?



##
docs/streams/developer-guide/config-streams.html:
##
@@ -243,19 +243,19 @@ num.standby.replicas
-  acceptable.recovery.lag
+  acceptable.recovery.lag

Review Comment:
   Why do we change this? Seem the top row with "Parameter Name" is `odd` 
already, so `even` seems to be correct here?



##
docs/streams/developer-guide/config-streams.html:
##
@@ -1039,7 +1045,7 @@ topology.optimizationdefault.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+windowed.inner.class.serde(Deprecated.)

Review Comment:
   ```suggestion
   windowed.inner.class.serde 
(Deprecated.)
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -713,6 +713,12 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization.
 
 
+  
+windowed.inner.class.serde(Deprecated.)
+
+  The default Serializer/Deserializer class for the inner 
class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde 
 interface. Note that setting this config in KafkaStreams 
application would result in an error as it is meant to be used only from Plain 
consumer client.

Review Comment:
   ```suggestion
 The default Serializer/Deserializer class for the inner 
class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde 
interface. Note that setting this config in KafkaStreams application would 
result in an error as it is meant to be used only from Plain consumer 
client.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]

2024-07-10 Thread via GitHub


masonyc commented on PR #16563:
URL: https://github.com/apache/kafka/pull/16563#issuecomment-2221709715

   > Thanks for the changes @masonyc.
   > 
   > Can you confirm that some of these tests fail if you remove `"default": 
"-1",` from these two schemas?
   > 
   > 1. 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/VoteRequest.json#L27
   > 2. 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json#L27
   > 
   > While verifying the above, can you also confirm that the test failures 
provide enough information to reproduce the failure. I am concerned that since 
the replica ids are random the test failures won't provide enough information 
to know which replica id value cause the failure.
   > 
   > Let's make similar changes to `KafkaRaftClientSnapshotTest`.
   
   Hi @jsancio , thanks for the review. Confirmed that I can see test failures 
when removing the -1 flag in both schemas. I have added the Leader Id passed 
from test case and Leader Id from Raft Response into the assertion message 
block so it can be used to reproduce the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]

2024-07-10 Thread via GitHub


masonyc commented on code in PR #16563:
URL: https://github.com/apache/kafka/pull/16563#discussion_r1673181139


##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -3854,4 +3859,13 @@ private static ReplicaKey replicaKey(int id, boolean 
withDirectoryId) {
 Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID;
 return ReplicaKey.of(id, directoryId);
 }
+
+private static Integer getRandomPort() {
+int minPort = 1024;
+int mockAddressPrefix = 9990;
+// Number of nodes we can set up if we got a random number that is 
maximum in the range
+int reservedNumberOfPorts = 50;
+int maxPort = 65535 - mockAddressPrefix - reservedNumberOfPorts;
+return ThreadLocalRandom.current().nextInt((maxPort - minPort) + 1) + 
minPort;
+}

Review Comment:
   renamed and removed the minPort so it can generate from 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]

2024-07-10 Thread via GitHub


masonyc commented on code in PR #16563:
URL: https://github.com/apache/kafka/pull/16563#discussion_r1673180531


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -1233,7 +1238,7 @@ static void verifyLeaderChangeMessage(
 LeaderChangeMessage leaderChangeMessage = 
ControlRecordUtils.deserializeLeaderChangeMessage(recordValue);
 assertEquals(leaderId, leaderChangeMessage.leaderId());
 assertEquals(voters.stream().map(voterId -> new 
Voter().setVoterId(voterId)).collect(Collectors.toList()),
-leaderChangeMessage.voters());
+
leaderChangeMessage.voters().stream().sorted(Comparator.comparingInt(Voter::voterId)).collect(Collectors.toList()));

Review Comment:
   thanks for the feedback, we still need to collect to List, otherwise the 
assertion will fail. Updated format in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]

2024-07-10 Thread via GitHub


masonyc commented on code in PR #16563:
URL: https://github.com/apache/kafka/pull/16563#discussion_r1673179696


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -576,6 +577,10 @@ void assertSentDescribeQuorumResponse(
 partitionData,
 nodes
 );
+
+List sortedVoters = 
response.topics().get(0).partitions().get(0).currentVoters().stream().sorted(Comparator.comparingInt(ReplicaState::replicaId)).collect(Collectors.toList());
+
response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters);

Review Comment:
   thanks for the feedback, updated in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]

2024-07-10 Thread via GitHub


jolshan commented on PR #16183:
URL: https://github.com/apache/kafka/pull/16183#issuecomment-2221692444

   I think I also need to fix the case where max version is 0 as well. 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]

2024-07-10 Thread via GitHub


jolshan commented on PR #16183:
URL: https://github.com/apache/kafka/pull/16183#issuecomment-2221688829

   Yeehaw time to rebase
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17011) SupportedFeatures.MinVersion incorrectly blocks v0

2024-07-10 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-17011.

Resolution: Fixed

> SupportedFeatures.MinVersion incorrectly blocks v0
> --
>
> Key: KAFKA-17011
> URL: https://issues.apache.org/jira/browse/KAFKA-17011
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.9.0
>
>
> SupportedFeatures.MinVersion incorrectly blocks v0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]

2024-07-10 Thread via GitHub


jolshan merged PR #16421:
URL: https://github.com/apache/kafka/pull/16421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16959: ConfigCommand should allow to define both `entity-default` and `entity-name` [kafka]

2024-07-10 Thread via GitHub


chia7712 commented on code in PR #16381:
URL: https://github.com/apache/kafka/pull/16381#discussion_r1673154305


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -354,128 +354,150 @@ object ConfigCommand extends Logging {
 }
   }
 
-  @nowarn("cat=deprecation")
   def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
 val entityTypes = opts.entityTypes
 val entityNames = opts.entityNames
 val entityTypeHead = entityTypes.head
-val entityNameHead = entityNames.head
 val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no 
need for mutability
 val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new 
ConfigEntry(k, v)) }
 val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
 entityTypeHead match {
   case ConfigType.TOPIC =>
-val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-  .map { entry => (entry.name, entry) }.toMap
+alterTopicConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+  case ConfigType.BROKER =>
+alterBrokerConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+  case BrokerLoggerConfigType =>
+alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+  case ConfigType.USER | ConfigType.CLIENT =>
+alterUserOrClientConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+  case ConfigType.IP =>
+alterIpConfig(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeAdded, configsToBeDeleted)
+  case ConfigType.CLIENT_METRICS =>
+alterClientMetricsConfig(adminClient, entityTypeHead, entityNames, 
configsToBeAdded, configsToBeDeleted)
+  case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
+}
 
-// fail the command if any of the configs to be deleted does not exist
-val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-if (invalidConfigs.nonEmpty)
-  throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+if (entityNames.nonEmpty) {
+  entityNames.foreach(
+entityName => println(s"Completed updating config for 
${entityTypeHead.dropRight(1)} $entityName.")
+  )
+} else
+  println(s"Completed updating default config for $entityTypeHead in the 
cluster.")
+  }
 
-val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
-val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
-val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
-  ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-).asJavaCollection
-adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+  private def alterTopicConfig(adminClient: Admin, entityTypeHead: String, 
entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], 
configsToBeDeleted: Seq[String]) = {
+entityNames.foreach { entityName =>
+  getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName)
+}
+val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
+val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+  ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, 
""), AlterConfigOp.OpType.DELETE) }
+  ).asJavaCollection
+adminClient.incrementalAlterConfigs(entityNames.map(new 
ConfigResource(ConfigResource.Type.TOPIC, _))
+.map(_ -> alterEntries).toMap.asJava, alterOptions)
+  .all().get(60, TimeUnit.SECONDS)
+  }
 
-  case ConfigType.BROKER =>
-val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-  .map { entry => (entry.name, entry) }.toMap
+  private def getOldConfig(adminClient: Admin, entityTypeHead: String, 
configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry] 
= {
+val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName, 
includeSynonyms = false, describeAll = false)
+  .map { entry => (entry.name, entry) }.toMap
 
-// fail the command if any of the configs to be deleted does not exist
-val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-if (invalidConfigs.nonEmpty)
-  throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+// fail the command if any of the configs to b

Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]

2024-07-10 Thread via GitHub


jolshan commented on PR #16421:
URL: https://github.com/apache/kafka/pull/16421#issuecomment-2221672097

   Test failures are unrelated. I will merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-07-10 Thread via GitHub


chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1673151480


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
 controllerNode.metadataDirectory());
 }
 
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");

Review Comment:
   Do we still need those changes?



##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -706,4 +706,9 @@ public static ApiVersionsResponse createApiVersionsResponse(
 ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
 zkMigrationEnabled);
 }
+
+public static String getCurrentFunctionName() {

Review Comment:
   IIRC, we had a discussion about the topic name. What is the updates?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17118) Remove StorageTool#buildMetadataProperties

2024-07-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17118:
--

 Summary: Remove StorageTool#buildMetadataProperties
 Key: KAFKA-17118
 URL: https://issues.apache.org/jira/browse/KAFKA-17118
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


It is useless in production after 
https://github.com/apache/kafka/commit/7060c08d6f9b0408e7f40a90499caf2e636fac61



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17102: FetchRequest#forgottenTopics would return incorrect data [kafka]

2024-07-10 Thread via GitHub


chia7712 commented on code in PR #16557:
URL: https://github.com/apache/kafka/pull/16557#discussion_r1673143180


##
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##
@@ -243,4 +245,44 @@ public void testPartitionDataEquals() {
 new FetchRequest.PartitionData(Uuid.randomUuid(), 300, 0L, 300, 
Optional.of(300)));
 }
 
+@Test
+public void testFetchRequestNoCacheData() {
+short version = 13;
+Uuid topicId = Uuid.randomUuid();
+int partition = 0;
+TopicIdPartition tp = new TopicIdPartition(topicId, partition, 
"topic");
+
+Map partitionData = 
Collections.singletonMap(tp.topicPartition(),
+new FetchRequest.PartitionData(topicId, 0, 0, 0, 
Optional.empty()));
+List toReplace = Collections.singletonList(tp);
+
+FetchRequest fetchRequest = FetchRequest.Builder
+.forReplica(version, 0, 1, 1, 1, partitionData)
+.removed(Collections.emptyList())
+.replaced(toReplace)
+.metadata(FetchMetadata.newIncremental(123)).build(version);
+
+HashMap topicNames = new HashMap<>();
+topicNames.put(topicId, tp.topic());
+
+List requestsWithTopicsName = 
fetchRequest.forgottenTopics(topicNames);

Review Comment:
   `Collections.singletonMap(topicId, tp.topic())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16863:

Fix Version/s: 4.0.0

> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Muralidhar Basani
>Priority: Trivial
>  Labels: kip
> Fix For: 4.0.0
>
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.
> KIP-1056: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16863:

Labels: kip  (was: need-kip)

> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Muralidhar Basani
>Priority: Trivial
>  Labels: kip
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-07-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16863:

Description: 
Kafka Streams has a set of configs with `default.` prefix. The intent for the 
default-prefix is to make a distinction between, well the default, and in-place 
overwrites in the code. Eg, users can specify ts-extractors on a per-topic 
basis.

However, for the deserialization- and production-exception handlers, no such 
overwrites are possible, and thus, `default.` does not really make sense, 
because there is just one handler overall. Via KIP-1033 we added a new 
processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add 
them back w/o the `default.` prefix.

KIP-1056: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig]
 

  was:
Kafka Streams has a set of configs with `default.` prefix. The intent for the 
default-prefix is to make a distinction between, well the default, and in-place 
overwrites in the code. Eg, users can specify ts-extractors on a per-topic 
basis.

However, for the deserialization- and production-exception handlers, no such 
overwrites are possible, and thus, `default.` does not really make sense, 
because there is just one handler overall. Via KIP-1033 we added a new 
processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add 
them back w/o the `default.` prefix.


> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Muralidhar Basani
>Priority: Trivial
>  Labels: kip
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.
> KIP-1056: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]

2024-07-10 Thread via GitHub


jeffkbkim commented on code in PR #16511:
URL: https://github.com/apache/kafka/pull/16511#discussion_r1673128140


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -993,13 +998,50 @@ public void testCleanupGroupMetadata() {
 verify(groupMetadataManager, 
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
 }
 
+@Test
+public void testCleanupGroupMetadataWhenAppendFails() {
+GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+Time mockTime = new MockTime();
+MockCoordinatorTimer timer = new 
MockCoordinatorTimer<>(mockTime);
+GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+new LogContext(),
+groupMetadataManager,
+offsetMetadataManager,
+mockTime,
+timer,
+mock(GroupCoordinatorConfig.class),
+mock(CoordinatorMetrics.class),
+metricsShard
+);
+
+when(groupMetadataManager.groupIds()).thenReturn(mkSet("group-id", 
"other-group-id"));
+when(offsetMetadataManager.cleanupExpiredOffsets(eq("group-id"), 
eq(new ArrayList<>(.thenReturn(true);
+when(groupMetadataManager.maybeDeleteGroup(eq("group-id"), eq(new 
ArrayList<>(.thenReturn(true);

Review Comment:
   the test above, testCleanupGroupMetadata 
(https://github.com/apache/kafka/pull/16511/files/b73af9c786d4ad29259d0bfeb7c16db3324eff4b#diff-3a0b9cad0253e0f6d4665efd0d6f7efd5bd5dd96d3ba31005cab06fa728aad8fR990)
   
   tests that the records we add are reflected. would this be sufficient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]

2024-07-10 Thread via GitHub


jeffkbkim commented on code in PR #16511:
URL: https://github.com/apache/kafka/pull/16511#discussion_r1673126130


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -575,18 +581,32 @@ public CoordinatorResult deleteOffs
 public CoordinatorResult cleanupGroupMetadata() {
 long startMs = time.milliseconds();
 List records = new ArrayList<>();
+AtomicInteger deletedClassicGroupCount = new AtomicInteger(0);

Review Comment:
   If you mean by using it to conform to the lambda expression, yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add clean up command to tests README [kafka]

2024-07-10 Thread via GitHub


chia7712 commented on code in PR #16560:
URL: https://github.com/apache/kafka/pull/16560#discussion_r1673108881


##
tests/README.md:
##
@@ -47,6 +47,10 @@ 
TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeT
 ```
 bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh
 ```
+* Clean Docker Image
+```
+docker rm -f $(docker ps -aq --filter "name=ducker")

Review Comment:
   Have you considered using `./tests/docker/ducker-ak down -f` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864822#comment-17864822
 ] 

Eduwer Camacaro edited comment on KAFKA-17098 at 7/10/24 10:02 PM:
---

I've recently uploaded a fresh log ([^server.log.txt]) file with all of the log 
entries from the instance's start until the exception.


was (Author: JIRAUSER306126):
I've recently uploaded a fresh log ([^server0.log.txt]) file with all of the 
log entries from the instance's start until the exception.

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server.log.txt, server0.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduwer Camacaro updated KAFKA-17098:

Attachment: (was: server0.log.txt)

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduwer Camacaro updated KAFKA-17098:

Attachment: server.log.txt

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server.log.txt, server0.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864822#comment-17864822
 ] 

Eduwer Camacaro commented on KAFKA-17098:
-

I've recently uploaded a fresh log ([^server0.log.txt]) file with all of the 
log entries from the instance's start until the exception.

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server0.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduwer Camacaro updated KAFKA-17098:

Attachment: (was: server.log)

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server0.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore

2024-07-10 Thread Eduwer Camacaro (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduwer Camacaro updated KAFKA-17098:

Attachment: server0.log.txt

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server0.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-17066:
-

Assignee: Kirk True

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16984) New consumer should not complete leave operation until it gets a response

2024-07-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16984:
---
Description: 
When the new consumer attempts to leave a group, it sends a leave group request 
in a fire-and-forget mode, so, as soon as the request is generated, it will:

1. transitions to UNSUBSCRIBED
2. complete the leaveGroup operation future 

This task focus on point 2, which has the undesired side-effect that whatever 
might have been waiting for the leave to do something else, will carry on, ex. 
consumer close, leading to responses to disconnected clients we've seen when 
running stress tests)

When leaving a group while closing a consumer, the member sends the leave 
request and moves on to next operation, which is closing the network thread, so 
we end up with disconnected client receiving responses from the server. We 
should send leave group heartbeat, and transition to UNSUBSCRIBE, but only 
complete the leave operation when we get a response for it, which is a much 
more accurate confirmation that the consumer left the group and can move on 
with other operations.

Note that the legacy consumer does wait for a leave response before closing 
down the coordinator (see 
[AbstractCoordinator|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1135-L1140]),
 we we are looking to have the same behaviour on the new consumer. 

Note that with this task we'll only focus on changing the behaviour for the 
leave operation completion (point 2 above) to tidy up the close flow. We are 
not changing the transition to UNSUBSCRIBED, as it would require further 
consideration if ever needed. 

 

This is also a building block for future improvements around error handling for 
the leave request, which we don't have at the moment (related Jira linked)

  was:
When the new consumer attempts to leave a group, it sends a leave group request 
in a fire-and-forget mode, so, as soon as the request is generated, it will:

1. transitions to UNSUBSCRIBED
2. complete the leaveGroup operation future 

This task focus on point 2, which has the undesired side-effect that whatever 
might have been waiting for the leave to do something else, will carry on, ex. 
consumer close, leading to responses to disconnected clients we've seen when 
running stress tests)

When leaving a group while closing a consumer, the member sends the leave 
request and moves on to next operation, which is closing the network thread, so 
we end up with disconnected client receiving responses from the server. We 
should send leave group heartbeat, and transition to UNSUBSCRIBE, but only 
complete the leave operation when we get a response for it, which is a much 
more accurate confirmation that the consumer left the group and can move on 
with other operations.

Note that the legacy consumer does wait for a leave response before closing 
down the coordinator (see 
[AbstractCoordinator|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1135-L1140]),
 we we are looking to have the same behaviour on the new consumer. 

Note that with this task we'll only focus on changing the behaviour for the 
leave operation completion (point 2 above) to tidy up the close flow. We are 
not changing the transition to UNSUBSCRIBED, as it would require further 
consideration if ever needed. 


> New consumer should not complete leave operation until it gets a response
> -
>
> Key: KAFKA-16984
> URL: https://issues.apache.org/jira/browse/KAFKA-16984
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848
> Fix For: 3.9.0
>
>
> When the new consumer attempts to leave a group, it sends a leave group 
> request in a fire-and-forget mode, so, as soon as the request is generated, 
> it will:
> 1. transitions to UNSUBSCRIBED
> 2. complete the leaveGroup operation future 
> This task focus on point 2, which has the undesired side-effect that whatever 
> might have been waiting for the leave to do something else, will carry on, 
> ex. consumer close, leading to responses to disconnected clients we've seen 
> when running stress tests)
> When leaving a group while closing a consumer, the member sends the leave 
> request and moves on to next operation, which is closing the network thread, 
> so we end up with disconnected client receiving responses from the server. We 
> should send leave group heartbeat, and transition to UNSUBSC

[PR] KAFKA-16984: Complete consumer leave on response to leave request [kafka]

2024-07-10 Thread via GitHub


lianetm opened a new pull request, #16569:
URL: https://github.com/apache/kafka/pull/16569

   Improvement to ensure that, even though the leave group request is sent out 
once, the leave group operation is considered complete only when it receives a 
response to the HB to leave (successful or failed). 
   
   The motivation is to avoid undesired interactions with operations triggered 
after the unsubscribe (ex. consumer close triggers a leave group operation, and 
shuts down the network thread when it completes, which before this PR could 
lead to responses to disconnected clients). 
   
   This is the same behaviour on close that the legacy consumer has (waits for 
leave responses).
   
   Note that this PR does not change the transitions of the state machine on 
leave group, only the completion of the leave group future. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17117) Avoid instantiating classpath plugins when service loading plugins

2024-07-10 Thread Greg Harris (Jira)
Greg Harris created KAFKA-17117:
---

 Summary: Avoid instantiating classpath plugins when service 
loading plugins
 Key: KAFKA-17117
 URL: https://issues.apache.org/jira/browse/KAFKA-17117
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Affects Versions: 3.6.0
Reporter: Greg Harris


In KAFKA-14789 modifications were made to allow PluginClassLoaders to see all 
resource files of the parent DelegatingClassLoader and classpath, rather than 
selectively hiding some resources that were for ServiceLoader manifests.

This has the effect that the ServiceLoader finds classpath plugins when 
searching in plugin locations, and the PluginScanner filters these plugins out 
by checking for classloader equality.

This has some side-effects that are undesirable:
 * Classpath plugins may be instantiated with the thread context classloader 
set to a plugin classloader
 * Classpath plugins are instantiated multiple times, once for each plugin 
location
 * Exceptions from classpath plugins show up multiple times in the logs: 
KAFKA-17111

This change may require us to fork the ServiceLoader implementation, which is 
itself undesirable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-07-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17116:
---
Summary: New consumer may not send effective leave group if member ID 
received after close   (was: New consumer may not send leave group if member ID 
received after close )

> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17116) New consumer may not send leave group if member ID received after close

2024-07-10 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-17116:
--

 Summary: New consumer may not send leave group if member ID 
received after close 
 Key: KAFKA-17116
 URL: https://issues.apache.org/jira/browse/KAFKA-17116
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.8.0
Reporter: Lianet Magrans
 Fix For: 3.9.0


If the new consumer is closed after sending a HB to join, but before receiving 
the response to it, it will send a leave group request but without member ID 
(will simply fail with UNKNOWN_MEMBER_ID). This will make that the broker will 
have a registered new member, for which it will never receive a leave request 
for it.
 # consumer.subscribe -> sends HB to join, transitions to JOINING
 # consumer.close -> will transition to LEAVING and send HB with epoch -1 
(without waiting for in-flight requests)
 # consumer receives response to initial HB, containing the assigned member ID. 
It will simply ignore it because it's not in the group anymore (UNSUBSCRIBED)

Note that the expectation, with the current logic, and main downsides of this 
are:
 # If the case was that the member received partitions on the first HB, those 
partitions won't be re-assigned (broker waiting for the closed consumer to 
reconcile them), until the rebalance timeout expires. 
 # Even if no partitions were assigned to it, the member will remain in the 
group from the broker point of view (but not from the client POV). The member 
will be eventually kicked out for not sending HBs, but only when it's session 
timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Don't swallow validateReconfiguration exceptions [kafka]

2024-07-10 Thread via GitHub


ahuang98 commented on code in PR #16346:
URL: https://github.com/apache/kafka/pull/16346#discussion_r1672981769


##
clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java:
##
@@ -190,9 +191,13 @@ public Set reconfigurableConfigs() {
 }
 
 @Override
-public void validateReconfiguration(Map configs) {
+public void validateReconfiguration(Map configs) throws 
ConfigException {
 if (this.securityProtocol == SecurityProtocol.SASL_SSL)
-sslFactory.validateReconfiguration(configs);
+try {
+sslFactory.validateReconfiguration(configs);
+} catch (IllegalStateException e) {
+throw new ConfigException("SASL reconfiguration failed due to 
" + e);

Review Comment:
   I'm concatenating the "SASL reconfiguration failed" message with the 
underlying exception string now since it looks like `ConfigException` will 
print errors in a very specific way 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864812#comment-17864812
 ] 

Lianet Magrans commented on KAFKA-17115:


Hey [~ChrisEgerton] , even though on the new consumer we don't have this same 
flow (no member ID required to join), I believe we would have the same gap you 
pointed out:


 # new consumer joins -> sends HB with epoch 0 (no member ID)
 # consumer closed -> this will still generate the HB to leave (epoch -1), but 
with no member ID because it does not have one yet, so not able to be processed 
correctly by the broker anyways
 # consumer receives response to initial HB to join (response containing member 
ID)

So yes, we would end up in a similar situation (less bad only because with the 
new protocol and no global barrier, we wouldn't have a blocked rebalance, just 
a member that is registered in the group and may have received partitions that 
won't be re-assigned until the rebalance timeout expires and the broker gives 
the partitions to someone else. The member would be kicked out of the group 
when its session expires. 

I will file a separate Jira to review and fix this edge case with the new 
consumer. Thanks!

> Closing newly-created consumers during rebalance can cause rebalances to hang
> -
>
> Key: KAFKA-17115
> URL: https://issues.apache.org/jira/browse/KAFKA-17115
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.9.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a dynamic consumer (i.e., one with no group instance ID configured) 
> first tries to join a group, the group coordinator normally responds with the 
> MEMBER_ID_REQUIRED error, under the assumption that the member will retry 
> soon after. During this step, the group coordinator will also generate a new 
> member ID for the consumer, include it in the error response for the initial 
> join group request, and expect that a member with that ID will participate in 
> future rebalances.
> If a consumer is closed in between the time that it sends the JoinGroup 
> request and the time that it receives the response from the group 
> coordinator, it will not attempt to leave the group, since it doesn't have a 
> member ID to include in that request.
> This will cause future rebalances to hang, since the group coordinator will 
> still expect a member with the ID for the now-closed consumer to join. 
> Eventually, the group coordinator may remove the closed consumer from the 
> group, but with default configuration settings, this could take as long as 
> five minutes.
> One possible fix is to send a LeaveGroup response with the member ID if the 
> consumer receives a JoinGroup response with a member ID after it has been 
> closed.
>  
> This applies to the legacy consumer; I have not verified yet with the new 
> async consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17111: Revert "KAFKA-15996: Improve JsonConverter performance (#14992)" [kafka]

2024-07-10 Thread via GitHub


gharris1727 opened a new pull request, #16568:
URL: https://github.com/apache/kafka/pull/16568

   This reverts commit 314de9f23c7e8d574eb3c03e345f8cf504266831.
   
   The original change causes ERROR logs to be printed on worker startup, and 
should be reverted in advance of the 3.8.0 release. This is an alternative to 
#16565.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]

2024-07-10 Thread via GitHub


gharris1727 commented on PR #16565:
URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221423742

   And for anyone reviewing this, it's easy to verify the change by checking 
the behavior of the following commands:
   
   ```
   ./gradlew jar
   ./bin/connect-plugin-path.sh list --plugin-location 
~/.m2/repository/com/fasterxml/jackson
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]

2024-07-10 Thread via GitHub


gharris1727 commented on PR #16565:
URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221414024

   cc @divijvaidya and @mimaison who reviewed the original performance PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add logs when metadata update is not successful [kafka]

2024-07-10 Thread via GitHub


jolshan commented on PR #16496:
URL: https://github.com/apache/kafka/pull/16496#issuecomment-2221397981

   cc: @C0urante I know we had some discussion on this issue with connect. 
Seems like this won't fix the issue but could provide logs to indicate when it 
is happening. (We likely will still need to do a KIP if we want to handle this 
case differently)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Await consumer group membership before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest [kafka]

2024-07-10 Thread via GitHub


C0urante commented on PR #16519:
URL: https://github.com/apache/kafka/pull/16519#issuecomment-2221387274

   It seems like there's a real consumer bug that's been causing this 
flakiness: https://issues.apache.org/jira/browse/KAFKA-17115
   
   Closing this PR in favor of the bug fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Await consumer group membership before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest [kafka]

2024-07-10 Thread via GitHub


C0urante closed pull request #16519: MINOR: Await consumer group membership 
before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest
URL: https://github.com/apache/kafka/pull/16519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]

2024-07-10 Thread via GitHub


gharris1727 commented on PR #16565:
URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221387172

   Thanks @vbalani002 for the bug report and prompt fix! I agree with this 
approach, let me know when you want me to review this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery

2024-07-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17111:

Priority: Blocker  (was: Major)

> ServiceConfigurationError in JsonSerializer/Deserializer during Plugin 
> Discovery
> 
>
> Key: KAFKA-17111
> URL: https://issues.apache.org/jira/browse/KAFKA-17111
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.8.0
>Reporter: Vikas Balani
>Assignee: Vikas Balani
>Priority: Blocker
>
> h3. Problem:
> JsonSerializer and JsonDeserializer use 
> objectMapper.findAndRegisterModules(), which attempts to register all Jackson 
> modules implementing com.fasterxml.jackson.databind.Module. This can cause a 
> ServiceConfigurationError when incompatible modules are present in the 
> classpath.
>  
> {code:java}
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.storage.Converter: Provider 
> org.apache.kafka.connect.json.JsonConverter could not be instantiated
>   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813)
>  at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729)
>  at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136)
>  at 
> org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67)
>  at 
> org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101)
>  at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113)
>  Caused by: java.util.ServiceConfigurationError: 
> com.fasterxml.jackson.databind.Module: 
> com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype
>  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
>  at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
>  at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192)
>  at 
> org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238)
>  at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
>  at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
>  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789)
>  ... 13 more{code}
>  
> h3. Steps to Reproduce:
> 1. Start a connect worker with Service loading enabled and with certain 
> connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink)
> 2. Observe ServiceConfigurationError during plugin discovery
> h3. Current Behavior:
> ServiceConfigurationError is thrown with message 
> "com.fasterxml.jackson.databind.Module:  not a subtype"
> Where  can be one of: - 
>  * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule
>  * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
>  * com.fasterxml.jackson.datatype.guava.GuavaModule
>  * com.fasterxml.jackson.datatype.joda.JodaModule
> h3. Proposed Solution:
> Explicitly register the Afterburner module instead of using 
> findAndRegisterModules().
> h3. Potential Impact:
>  - Resolves compatibility issues with certain Jackson modules
>  - Maintains performance improvement

[jira] [Assigned] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery

2024-07-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-17111:
---

Assignee: Vikas Balani  (was: Greg Harris)

> ServiceConfigurationError in JsonSerializer/Deserializer during Plugin 
> Discovery
> 
>
> Key: KAFKA-17111
> URL: https://issues.apache.org/jira/browse/KAFKA-17111
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.8.0
>Reporter: Vikas Balani
>Assignee: Vikas Balani
>Priority: Major
>
> h3. Problem:
> JsonSerializer and JsonDeserializer use 
> objectMapper.findAndRegisterModules(), which attempts to register all Jackson 
> modules implementing com.fasterxml.jackson.databind.Module. This can cause a 
> ServiceConfigurationError when incompatible modules are present in the 
> classpath.
>  
> {code:java}
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.storage.Converter: Provider 
> org.apache.kafka.connect.json.JsonConverter could not be instantiated
>   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813)
>  at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729)
>  at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136)
>  at 
> org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67)
>  at 
> org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101)
>  at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113)
>  Caused by: java.util.ServiceConfigurationError: 
> com.fasterxml.jackson.databind.Module: 
> com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype
>  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
>  at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
>  at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192)
>  at 
> org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238)
>  at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
>  at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
>  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789)
>  ... 13 more{code}
>  
> h3. Steps to Reproduce:
> 1. Start a connect worker with Service loading enabled and with certain 
> connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink)
> 2. Observe ServiceConfigurationError during plugin discovery
> h3. Current Behavior:
> ServiceConfigurationError is thrown with message 
> "com.fasterxml.jackson.databind.Module:  not a subtype"
> Where  can be one of: - 
>  * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule
>  * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
>  * com.fasterxml.jackson.datatype.guava.GuavaModule
>  * com.fasterxml.jackson.datatype.joda.JodaModule
> h3. Proposed Solution:
> Explicitly register the Afterburner module instead of using 
> findAndRegisterModules().
> h3. Potential Impact:
>  - Resolves compatibility issues with certain Jackson modules
>  - Maintains perform

[jira] [Commented] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery

2024-07-10 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864804#comment-17864804
 ] 

Greg Harris commented on KAFKA-17111:
-

Hi [~vbalani] Thank you for the bug report! I can reproduce it locally.

I believe that this should be a cosmetic error, as the error is thrown when the 
classpath JsonConverter is found via the each plugin.path. These later get 
excluded to avoid duplicates: 
[https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java#L144-L148]
 so even if the error was not thrown, these exceptions should be ignored.

I tried to figure out a way to prevent plugin scanning from finding classpath 
plugins, i'll have to take another look at that to see if we can prevent this 
sort of behavior in the future.
For now, I agree with your recommended course of action. I'll raise this in the 
release thread to see if we can include it in 3.8.0.

> ServiceConfigurationError in JsonSerializer/Deserializer during Plugin 
> Discovery
> 
>
> Key: KAFKA-17111
> URL: https://issues.apache.org/jira/browse/KAFKA-17111
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.8.0
>Reporter: Vikas Balani
>Assignee: Vikas Balani
>Priority: Major
>
> h3. Problem:
> JsonSerializer and JsonDeserializer use 
> objectMapper.findAndRegisterModules(), which attempts to register all Jackson 
> modules implementing com.fasterxml.jackson.databind.Module. This can cause a 
> ServiceConfigurationError when incompatible modules are present in the 
> classpath.
>  
> {code:java}
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.storage.Converter: Provider 
> org.apache.kafka.connect.json.JsonConverter could not be instantiated
>   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813)
>  at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729)
>  at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136)
>  at 
> org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67)
>  at 
> org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101)
>  at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113)
>  Caused by: java.util.ServiceConfigurationError: 
> com.fasterxml.jackson.databind.Module: 
> com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype
>  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
>  at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
>  at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192)
>  at 
> org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238)
>  at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
>  at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
>  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789)
>  ... 13 more{code}
>  
> 

[jira] [Assigned] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery

2024-07-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-17111:
---

Assignee: Greg Harris  (was: Vikas Balani)

> ServiceConfigurationError in JsonSerializer/Deserializer during Plugin 
> Discovery
> 
>
> Key: KAFKA-17111
> URL: https://issues.apache.org/jira/browse/KAFKA-17111
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.8.0
>Reporter: Vikas Balani
>Assignee: Greg Harris
>Priority: Major
>
> h3. Problem:
> JsonSerializer and JsonDeserializer use 
> objectMapper.findAndRegisterModules(), which attempts to register all Jackson 
> modules implementing com.fasterxml.jackson.databind.Module. This can cause a 
> ServiceConfigurationError when incompatible modules are present in the 
> classpath.
>  
> {code:java}
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.storage.Converter: Provider 
> org.apache.kafka.connect.json.JsonConverter could not be instantiated
>   at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813)
>  at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729)
>  at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136)
>  at 
> org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79)
>  at 
> org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67)
>  at 
> org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90)
>  at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128)
>  at 
> org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101)
>  at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113)
>  Caused by: java.util.ServiceConfigurationError: 
> com.fasterxml.jackson.databind.Module: 
> com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype
>  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
>  at 
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
>  at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
>  at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142)
>  at 
> com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192)
>  at 
> org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250)
>  at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238)
>  at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
>  at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
>  at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
>  at 
> java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789)
>  ... 13 more{code}
>  
> h3. Steps to Reproduce:
> 1. Start a connect worker with Service loading enabled and with certain 
> connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink)
> 2. Observe ServiceConfigurationError during plugin discovery
> h3. Current Behavior:
> ServiceConfigurationError is thrown with message 
> "com.fasterxml.jackson.databind.Module:  not a subtype"
> Where  can be one of: - 
>  * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule
>  * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
>  * com.fasterxml.jackson.datatype.guava.GuavaModule
>  * com.fasterxml.jackson.datatype.joda.JodaModule
> h3. Proposed Solution:
> Explicitly register the Afterburner module instead of using 
> findAndRegisterModules().
> h3. Potential Impact:
>  - Resolves compatibility issues with certain Jackson modules
>  - Maintains performa

[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe

2024-07-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15954:
---
Description: 
Currently the legacy and new consumer follows a minimal effort approach when 
sending a leave group (legacy) or last heartbeat request (new consumer). The 
request is sent without waiting/handling any response. This behaviour applies 
when the consumer is being closed or when it unsubscribes.

For the case when the consumer is being closed, (which is a "terminal" state), 
it probably makes sense to just follow a minimal effort approach for "properly" 
leaving the group (no retry logic). But for the case of unsubscribe, we could 
consider if valuable to to put a little more effort into making sure that the 
last heartbeat is sent and received by the broker (ex. what if coordinator not 
known/available when sending the last HB). Note that unsubscribe could a 
temporary state, where the consumer might want to re-join the group at any 
time. 

  was:
Currently the legacy and new consumer follows a minimal effort approach when 
sending a leave group (legacy) or last heartbeat request (new consumer). The 
request is sent without waiting/handling any response. This behaviour applies 
when the consumer is being closed or when it unsubscribes.

For the case when the consumer is being closed, (which is a "terminal" state), 
it makes sense to just follow a minimal effort approach for "properly" leaving 
the group. But for the case of unsubscribe, it would maybe make sense to put a 
little more effort in making sure that the last heartbeat is sent and received 
by the broker. Note that unsubscribe could a temporary state, where the 
consumer might want to re-join the group at any time. 


> Review minimal effort approach on consumer last heartbeat on unsubscribe
> 
>
> Key: KAFKA-15954
> URL: https://issues.apache.org/jira/browse/KAFKA-15954
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.9.0
>
>
> Currently the legacy and new consumer follows a minimal effort approach when 
> sending a leave group (legacy) or last heartbeat request (new consumer). The 
> request is sent without waiting/handling any response. This behaviour applies 
> when the consumer is being closed or when it unsubscribes.
> For the case when the consumer is being closed, (which is a "terminal" 
> state), it probably makes sense to just follow a minimal effort approach for 
> "properly" leaving the group (no retry logic). But for the case of 
> unsubscribe, we could consider if valuable to to put a little more effort 
> into making sure that the last heartbeat is sent and received by the broker 
> (ex. what if coordinator not known/available when sending the last HB). Note 
> that unsubscribe could a temporary state, where the consumer might want to 
> re-join the group at any time. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`

2024-07-10 Thread Ao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ao Li updated KAFKA-17114:
--
Affects Version/s: 3.9.0

> DefaultStateUpdater::handleRuntimeException should update isRunning before 
> calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
> -
>
> Key: KAFKA-17114
> URL: https://issues.apache.org/jira/browse/KAFKA-17114
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> I saw a flaky test in 
> DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId 
> recently.
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
> To make the bug more reproducible, you may add `Thread.sleep(5)` after 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  in  DefaultStateUpdater::handleRuntimeException
> The test is flaky because 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` 
> statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. 
> If `assertFalse(stateUpdater.isRunning());` is executed before 
> `isRunning.set(false);` the test will fail



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17115: Send LeaveGroup request if consumer receives JoinGroup response with member ID after being closed [kafka]

2024-07-10 Thread via GitHub


C0urante commented on PR #16567:
URL: https://github.com/apache/kafka/pull/16567#issuecomment-2221307929

   Waiting to see if CI likes this before marking ready for review. I was able 
to run the `AbstractCoordinatorTest` suite locally and verify that the clients 
module builds, but nothing else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`

2024-07-10 Thread Ao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ao Li updated KAFKA-17114:
--
Component/s: streams
Description: 
I saw a flaky test in 
DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId 
recently.

{code}
org.opentest4j.AssertionFailedError: expected:  but was: 
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31)
at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231)
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294)
at 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
{code}

To make the bug more reproducible, you may add `Thread.sleep(5)` after 
`addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
 in  DefaultStateUpdater::handleRuntimeException

The test is flaky because 
`addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
 will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` 
statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. 

If `assertFalse(stateUpdater.isRunning());` is executed before 
`isRunning.set(false);` the test will fail


   Priority: Minor  (was: Major)
Summary: DefaultStateUpdater::handleRuntimeException should update 
isRunning before calling 
`addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`  (was: 
handleRuntimeException)

> DefaultStateUpdater::handleRuntimeException should update isRunning before 
> calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
> -
>
> Key: KAFKA-17114
> URL: https://issues.apache.org/jira/browse/KAFKA-17114
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ao Li
>Priority: Minor
>
> I saw a flaky test in 
> DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId 
> recently.
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294)
>   at 
> org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:580)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
> To make the bug more reproducible, you may add `Thread.sleep(5)` after 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  in  DefaultStateUpdater::handleRuntimeException
> The test is flaky because 
> `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);`
>  will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` 
> statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. 
> If `assertFalse(stateUpdater.isRunning());` is executed before 
> `isRunning.set(false);` the test will fail



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17115: Send LeaveGroup request if consumer receives JoinGroup response with member ID after being closed [kafka]

2024-07-10 Thread via GitHub


C0urante opened a new pull request, #16567:
URL: https://github.com/apache/kafka/pull/16567

   [Jira](https://issues.apache.org/jira/browse/KAFKA-17115)
   
   This change implements a best-effort attempt to notify the group coordinator 
that the member ID of a closed consumer can be discarded, which can prevent 
rebalances from getting stuck if a dynamic consumer receives an initial 
JoinGroup response with the `MEMBER_ID_REQUIRED` error after it has been closed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17115:
-

 Summary: Closing newly-created consumers during rebalance can 
cause rebalances to hang
 Key: KAFKA-17115
 URL: https://issues.apache.org/jira/browse/KAFKA-17115
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.9.0
Reporter: Chris Egerton
Assignee: Chris Egerton


When a dynamic consumer (i.e., one with no group instance ID configured) first 
tries to join a group, the group coordinator normally responds with the 
MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon 
after. During this step, the group coordinator will also generate a new member 
ID for the consumer, include it in the error response for the initial join 
group request, and expect that a member with that ID will participate in future 
rebalances.

If a consumer is closed in between the time that it sends the JoinGroup request 
and the time that it receives the response from the group coordinator, it will 
not attempt to leave the group, since it doesn't have a member ID to include in 
that request.

This will cause future rebalances to hang, since the group coordinator will 
still expect a member with the ID for the now-closed consumer to join. 
Eventually, the group coordinator may remove the closed consumer from the 
group, but with default configuration settings, this could take as long as five 
minutes.

One possible fix is to send a LeaveGroup response with the member ID if the 
consumer receives a JoinGroup response with a member ID after it has been 
closed.

 

This applies to the legacy consumer; I have not verified yet with the new async 
consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17114) handleRuntimeException

2024-07-10 Thread Ao Li (Jira)
Ao Li created KAFKA-17114:
-

 Summary: handleRuntimeException
 Key: KAFKA-17114
 URL: https://issues.apache.org/jira/browse/KAFKA-17114
 Project: Kafka
  Issue Type: Bug
Reporter: Ao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-10 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1672845320


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -698,6 +663,42 @@ object StorageTool extends Logging {
 voterSet
   }
 
+  def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, 
InetSocketAddress] = {
+val advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners
+val listeners: util.Map[ListenerName, InetSocketAddress] = new 
util.HashMap()
+advertisedListenerEndpoints.foreach(endpoint => {
+  val host: String = endpoint.host
+  listeners.put(endpoint.listenerName, new InetSocketAddress(host, 
endpoint.port))
+})
+listeners
+  }
+
+  private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: 
util.Map[Integer, InetSocketAddress],
+   metaProperties: MetaProperties,
+   config: KafkaConfig): util.Map[ListenerName, 
InetSocketAddress] = {

Review Comment:
   From the kip description 
   "When the format command is executed with this option it will read the 
node.id configured in the properties file specified by the --config option and 
compare it against the  specified in --controller-quorum-voters. If 
there is a match, it will write the  specified to the 
directory.id property in the meta.properties for the metadata.log.dir 
directory."
   
   I tried adding the if condition 
   if (metaProperties.nodeId().getAsInt == replicaId) ) 
   
   in the method 
   
   May be am wrong. Can you pls suggest code maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-10 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1672833818


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -112,20 +112,21 @@ object StorageTool extends Logging {
   setNodeId(config.nodeId).
   build()
 val standaloneMode = namespace.getBoolean("standalone")
-var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = 
List()
 
 val controllersQuorumVoters = 
namespace.getString("controller_quorum_voters")
 if(standaloneMode && controllersQuorumVoters != null) {
   throw new TerseFailure("Both --standalone and --controller-quorum-voters 
were set. Only one of the two flags can be set.")
 }
 
+var listeners: util.Map[ListenerName, InetSocketAddress] = new 
util.HashMap()
 if (standaloneMode) {
-  advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners
+  listeners = createStandaloneVoterMap(config)
 } else if(controllersQuorumVoters != null) {
   if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
 throw new TerseFailure("Expected schema for --controller-quorum-voters 
is [-]@:")
   }
-  advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners
+  val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = 
parseVoterConnections(Collections.singletonList(controllersQuorumVoters))
+  listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, 
metaProperties, config)

Review Comment:
   This means we would have to update 
https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java#L178
 to return Uuid too ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery

2024-07-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17111:

Description: 
h3. Problem:

JsonSerializer and JsonDeserializer use objectMapper.findAndRegisterModules(), 
which attempts to register all Jackson modules implementing 
com.fasterxml.jackson.databind.Module. This can cause a 
ServiceConfigurationError when incompatible modules are present in the 
classpath.

 
{code:java}
java.util.ServiceConfigurationError: 
org.apache.kafka.connect.storage.Converter: Provider 
org.apache.kafka.connect.json.JsonConverter could not be instantiated
  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586)
 at 
java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813)
 at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729)
 at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403)
 at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176)
 at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136)
 at 
org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61)
 at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79)
 at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67)
 at 
org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99)
 at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90)
 at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78)
 at 
org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128)
 at 
org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101)
 at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113)
 Caused by: java.util.ServiceConfigurationError: 
com.fasterxml.jackson.databind.Module: 
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype
 at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593)
 at 
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244)
 at 
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
 at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
 at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
 at 
com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158)
 at 
com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142)
 at 
com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192)
 at org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58)
 at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250)
 at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238)
 at 
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
 at 
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
 at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
 at 
java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789)
 ... 13 more{code}
 
h3. Steps to Reproduce:

1. Start a connect worker with Service loading enabled and with certain 
connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink)
2. Observe ServiceConfigurationError during plugin discovery
h3. Current Behavior:

ServiceConfigurationError is thrown with message 
"com.fasterxml.jackson.databind.Module:  not a subtype"

Where  can be one of: - 
 * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule
 * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
 * com.fasterxml.jackson.datatype.guava.GuavaModule
 * com.fasterxml.jackson.datatype.joda.JodaModule

h3. Proposed Solution:

Explicitly register the Afterburner module instead of using 
findAndRegisterModules().
h3. Potential Impact:
 - Resolves compatibility issues with certain Jackson modules
 - Maintains performance improvements from Afterburner module
 - May slightly change behavior for users relying on auto-registration of other 
Jackson modules

  was:
h3. Problem:

JsonSerializer and JsonDeserializer use objectMapper.findAndRegisterModules(), 
which attempts to register all Jackson modules implementing 
com.fasterxml.jackson.databind.Module. This can cause a 
ServiceConfigurationError when incompatible modules are present in the 
classpath.

 
{code:java}
java.util.ServiceConfigurationError: 
org.apache.kafka.connect.storage.Converter: Provider 
org.apache.kafka.connect.json.JsonConverter could not be instantiated  at 
java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) at 
java

[jira] [Commented] (KAFKA-17097) Add replace.null.with.default configuration to ValueToKey and ReplaceField (KIP-1040)

2024-07-10 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864782#comment-17864782
 ] 

Greg Harris commented on KAFKA-17097:
-

[~mimaison] Sorry, I was the one to close the previous issue, since the 
attached PR had a larger scope than the issue description and a smaller scope 
than the KIP. I wanted to avoid the risk that a single issue straddles a 
release, in case the follow-up PR is delayed.

> Add replace.null.with.default configuration to ValueToKey and ReplaceField 
> (KIP-1040)
> -
>
> Key: KAFKA-17097
> URL: https://issues.apache.org/jira/browse/KAFKA-17097
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Assignee: PoAn Yang
>Priority: Major
>  Labels: newbie
>
> {color:#172b4d}See 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677] 
> for motivation and design.
> {color}These are the final remaining transformations which still need this 
> configuration added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]

2024-07-10 Thread via GitHub


gharris1727 commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -793,6 +796,46 @@ protected void assignConnectors(List 
workerAssignment, Collection implements Iterator {
+
+private final Map> grouped;
+private final List keys;
+
+private int k;
+
+public BalancedIterator(Collection collection, Function 
allocationGrouper) {
+this.k = 0;
+this.grouped = collection.stream().collect(Collectors.groupingBy(
+allocationGrouper,
+Collectors.collectingAndThen(
+Collectors.toList(),
+List::iterator
+)
+));
+this.keys = collection.stream()
+.map(allocationGrouper)
+.distinct()
+.collect(Collectors.toList());

Review Comment:
   Maybe I don't understand, but I don't think this changed anything. The 
incoming Collection may still have an over-representation of a single connector 
first, leading that connector to be preferentially revoked.
   
   For example, consider this situation
   
   ```
   W1: C1 C2 C3 C4
   W2: C1 C5 C6 C7
   W3: C1 C8 C9 C10
   ```
   
   If a new worker joins, C1 could be revoked because it appears the same 
number of times as all of the other connectors, but that would violate local 
balance later:
   
   ```
   W1: C2 C3 C4
   W2: C5 C6 C7
   W3: C8 C9 C10
   W4: C1 C1 C1
   ```
   
   The BalancedIterator isn't fairly tie-breaking when two connectors have the 
same number of jobs assigned to the current worker. Picking a single job to 
revoke depends on the entire rest of the state, and some degree of predicting 
how the jobs will be distributed afterwards.
   
   This is what I think the "ideal" state should be after that initial state:
   ```
   W1: C1 C2 C3
   W2: C1 C5 C6
   W3: C1 C8 C9
   W4: C4 C7 C10
   ```
   At most one C1 should be revoked overall, because revoking two to put on W4 
would break local balance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >