[jira] [Commented] (KAFKA-17101) Mirror maker internal topics cleanup policy changes to 'delete' from 'compact'
[ https://issues.apache.org/jira/browse/KAFKA-17101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864935#comment-17864935 ] kaushik srinivas commented on KAFKA-17101: -- [~gharris1727] Below is the configuration. site1.ssl.keystore.filename=/etc/mirror-maker/secrets/ssl/site1/server.jks site1.ssl.truststore.filename=/etc/mirror-maker/secrets/ssl/site1/trustchain.jks site2.security.protocol=SSL site1->site2.heartbeats.topic.replication.factor=3 site1.ssl.enabled.protocols=TLSv1.2,TLSv1.3 syslog=false site2.ssl.keystore.filename=/etc/mirror-maker/secrets/ssl/site2/server.jks site2.consumer.ssl.cipher.suites= site1->site2.sync.topic.configs.interval.seconds=300 site1->site2.topics=.*_ALARM$,.*_INTERNAL_Intent_Changes$,product_INTERNAL_HAM_UPDATE$ site1->site2.replication.factor=3 site1->site2.emit.checkpoints.enabled=true site2.ssl.key.password=productkeystore tasks.max=1 site1.ssl.keystore.password=productkeystore site1.ssl.truststore.location=/etc/kafka/shared/site1_truststore site1.status.storage.replication.factor=3 site1.ssl.truststore.password=productkeystore site1->site2.sync.topic.acls.enabled=false site1->site2.refresh.topics.interval.seconds=300 site2.ssl.truststore.location=/etc/kafka/shared/site2_truststore site2.ssl.truststore.filename=/etc/mirror-maker/secrets/ssl/site2/trustchain.jks site1.ssl.protocol=TLSv1.2 site1->site2.replication.policy.class=RetainTopicNameReplicationPolicy site1->site2.groups=.* site1.security.protocol=SSL site1->site2.groups.blacklist=console-consumer-.*, connect-.*, __.* site1.config.storage.replication.factor=3 site1->site2.emit.hearbeats.enabled=true site2.ssl.truststore.password=productkeystore site1->site2.offset-syncs.topic.replication.factor=3 site1.ssl.keystore.location=/etc/kafka/shared/site1_keystore site1.offset.storage.replication.factor=3 clusters=site1,site2 site1.bootstrap.servers=product-kafka-headless:9092 site2.ssl.protocol=TLSv1.2 site1->site2.refresh.groups.interval.seconds=300 site2.ssl.enabled=true site2.ssl.enabled.protocols=TLSv1.2,TLSv1.3 site2.ssl.endpoint.identification.algorithm= site1.ssl.cipher.suites= site1->site2.checkpoints.topic.replication.factor=3 site1.producer.ssl.cipher.suites= site2.ssl.keystore.location=/etc/kafka/shared/site2_keystore site2.config.storage.replication.factor=3 site2.status.storage.replication.factor=3 site1.ssl.enabled=true site1->site2.enabled=true site1.ssl.endpoint.identification.algorithm= site1.admin.ssl.cipher.suites= site2.producer.ssl.cipher.suites= site2.ssl.keystore.password=productkeystore site2.ssl.cipher.suites= site2.offset.storage.replication.factor=3 site1.ssl.key.password=productkeystore site1.consumer.ssl.cipher.suites= site2.bootstrap.servers=product-kafka-headless:9097 site1->site2.sync.topic.configs.enabled=true site2.admin.ssl.cipher.suites= > Mirror maker internal topics cleanup policy changes to 'delete' from > 'compact' > --- > > Key: KAFKA-17101 > URL: https://issues.apache.org/jira/browse/KAFKA-17101 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1, 3.5.1, 3.6.1 >Reporter: kaushik srinivas >Priority: Major > > Scenario/Setup details > Kafka cluster 1: 3 replicas > Kafka cluster 2: 3 replicas > MM1 moving data from cluster 1 to cluster 2 > MM2 moving data from cluster 2 to cluster 1 > Sometimes with a reboot of the kafka cluster 1 and MM1 instance, we observe > MM failing to come up with below exception, > {code:java} > {"message":"DistributedHerder-connect-1-1 - > org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker > clientId=connect-1, groupId=site1-mm2] Uncaught exception in herder work > thread, exiting: "}} > org.apache.kafka.common.config.ConfigException: Topic > 'mm2-offsets.site1.internal' supplied via the 'offset.storage.topic' property > is required to have 'cleanup.policy=compact' to guarantee consistency and > durability of source connector offsets, but found the topic currently has > 'cleanup.policy=delete'. Continuing would likely result in eventually losing > source connector offsets and problems restarting this Connect cluster in the > future. Change the 'offset.storage.topic' property in the Connect worker > configurations to use a topic with 'cleanup.policy=compact'. {code} > Once the topic is altered with cleanup policy of compact. MM works just fine. > This is happening on our setups sporadically and across varieties of > scenarios. Not been successful in identifying the exact reproduction steps as > of now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]
vbalani002 commented on PR #16565: URL: https://github.com/apache/kafka/pull/16565#issuecomment-021575 Thanks for looking into this promptly, @gharris1727! I was waiting for a green CI build before marking the PR as ready. Since there are no test failures in the pipeline, I'm opening it for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6197) Difficult to get to the Kafka Streams javadocs
[ https://issues.apache.org/jira/browse/KAFKA-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6197: --- Labels: beginner newbie (was: ) > Difficult to get to the Kafka Streams javadocs > -- > > Key: KAFKA-6197 > URL: https://issues.apache.org/jira/browse/KAFKA-6197 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 0.11.0.2, 1.0.0 >Reporter: James Cheng >Priority: Major > Labels: beginner, newbie > > In order to get to the javadocs for the Kafka producer/consumer/streams, I > typically go to http://kafka.apache.org/documentation/ and click on either > 2.1 2.2 or 2.3 in the table of contents to go right to appropriate section. > The link for "Streams API" now goes to the (very nice) > http://kafka.apache.org/10/documentation/streams/. That page doesn't have a > direct link to the Javadocs anywhere. The examples and guides actually > frequently mention "See javadocs for details" but there are no direct links > to it. > If I instead go back to the main page and scroll directly to section 2.3, > there is still the link to get to the javadocs. But it's harder to jump > immediately to it. And it's a little confusing that section 2.3 in the table > of contents does not link you to section 2.3 of the page. > It would be nice if the link to the Streams javadocs was easier to get to. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16661: Added a lower `log.initial.task.delay.ms` value [kafka]
showuon commented on PR #16221: URL: https://github.com/apache/kafka/pull/16221#issuecomment-2221970784 Retriggering CI: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16221/4/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8629) Kafka Streams Apps to support small native images through GraalVM
[ https://issues.apache.org/jira/browse/KAFKA-8629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8629. Resolution: Fixed GraalVM should work with KS now. > Kafka Streams Apps to support small native images through GraalVM > - > > Key: KAFKA-8629 > URL: https://issues.apache.org/jira/browse/KAFKA-8629 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 > Environment: OSX > Linux on Docker >Reporter: Andy Muir >Assignee: Andy Muir >Priority: Minor > > I'm investigating using [GraalVM|http://example.com/] to help with reducing > docker image size and required resources for a simple Kafka Streams > microservice. To this end, I'm looking at running a microservice which: > 1) consumes from a Kafka topic (XML) > 2) Transforms into JSON > 3) Produces to a new Kafka topic. > The Kafka Streams app running in the JVM works fine. > When I attempt to build it to a GraalVM native image (binary executable which > does not require the JVM, hence smaller image size and less resources), I > encountered a few > [incompatibilities|https://github.com/oracle/graal/blob/master/substratevm/LIMITATIONS.md] > with the source code in Kafka. > I've implemented a workaround for each of these in a fork (link to follow) to > help establish if it is feasible. I don't intend (at this stage) for the > changes to be applied to the broker - I'm only after Kafka Streams for now. > I'm not sure whether it'd be a good idea for the broker itself to run as a > native image! > There were 2 issues getting the native image with kafka streams: > 1) Some Reflection use cases using MethodHandle > 2) Anything JMX > To work around these issues, I have: > 1) Replaced use of MethodHandle with alternatives > 2) Commented out the JMX code in a few places > While the first may be sustainable, I'd expect that the 2nd option should be > put behind a configuration switch to allow the existing code to be used by > default and turning off JMX if configured. > *I haven't created a PR for now, as I'd like feedback to decide if it is > going to be feasible to carry this forwards.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9738) Add Generics Type Parameters to forwarded() in MockProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864883#comment-17864883 ] Matthias J. Sax commented on KAFKA-9738: [~cadonna] – I believe this is fixed with the new `api.MockProcessorContext`? Can we close this ticket? > Add Generics Type Parameters to forwarded() in MockProcessorContext > > > Key: KAFKA-9738 > URL: https://issues.apache.org/jira/browse/KAFKA-9738 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Minor > > The method {{forwarded()}} to capture the forwarded records in > {{MockProcessorContext}} does not have any type parameters although the > corresponding {{forward()}} does have them. To enable type checking at > compile time in tests, generics parameters shall be added to the > {{forwarded()}} method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13326) Add multi-cluster support to Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-13326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13326. - Resolution: Duplicate > Add multi-cluster support to Kafka Streams > -- > > Key: KAFKA-13326 > URL: https://issues.apache.org/jira/browse/KAFKA-13326 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Guangyuan Wang >Priority: Major > Labels: needs-kip > > Dear Kafka Team, > According to the link, > https://kafka.apache.org/28/documentation/streams/developer-guide/config-streams.html#bootstrap-servers. > Kafka Streams applications can only communicate with a single Kafka cluster > specified by this config value. Future versions of Kafka Streams will support > connecting to different Kafka clusters for reading input streams and writing > output streams. > Which version will this feature be added in the Kafka stream? This is really > a very good feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10356) Handle accidental deletion of sink-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864879#comment-17864879 ] Matthias J. Sax commented on KAFKA-10356: - Is the description of this ticket still correct? Based on https://issues.apache.org/jira/browse/KAFKA-16508 which we addressed recently, we original ended up with an infinite retry loop (at least in newer releases), and did not call the ProductionExceptionHandler. The new behavior with K16508 is to call the handler and allow user to drop the record on the floor. However, I would not consider this as a "lost silently" case, because the users' custom handler did make this decision. I don't think that triggering a rebalance and shutting down if a sink topic does not exist is the right thing any longer – maybe it was a good idea back in the day, for which we did not have "dynamic routing" feature. But with "dynamic routing", we should just rely on the production exception handler IMHO, and allow to drop a potential poison pill record. Related is https://issues.apache.org/jira/browse/KAFKA-17057 proposing to a RETRY to the handler. Any objections to just close this ticket? > Handle accidental deletion of sink-topics as exceptional failure > > > Key: KAFKA-10356 > URL: https://issues.apache.org/jira/browse/KAFKA-10356 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Today when sink topics are deleted, the producer's send callback would > eventually return the UnknownTopicOrPartition exception after configured > max.delivery.ms whose default is 2min if EOS is not enabled (otherwise its > Integer.MAX_VALUE). Then in Streams implementation the exception would be > handled by ProductionExceptionHandler which by default would treat it as > `FAIL` and would cause the thread to die. If it treat it is CONTINUE then it > would be silently ignored and the sent records are lost silently. > We should improve this situation in Streams by special-handling > {{UnknownTopicOrPartition}} exception and trigger a rebalance as well, and > then in leader we can also check if the sink topic metadata exists, just like > source topic, and then follow the same logic as in > https://issues.apache.org/jira/browse/KAFKA-10355. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: add clean up command to tests README [kafka]
gongxuanzhang commented on PR #16560: URL: https://github.com/apache/kafka/pull/16560#issuecomment-2221896315 I update it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add clean up command to tests README [kafka]
gongxuanzhang commented on code in PR #16560: URL: https://github.com/apache/kafka/pull/16560#discussion_r1673328940 ## tests/README.md: ## @@ -47,6 +47,10 @@ TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeT ``` bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` +* Clean Docker Image +``` +docker rm -f $(docker ps -aq --filter "name=ducker") Review Comment: you are right,this is even better! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17096:Fix kafka_log4j_appender.py [kafka]
gongxuanzhang commented on PR #16559: URL: https://github.com/apache/kafka/pull/16559#issuecomment-2221884905 I update it, please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Description: *env* java:jdk1.8.0_271 os:centos 7 kafka: 2.41 and 2.8.2 ranger:2.1.0 After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. There ara some jpg of my policy and the result of commands !image-2024-07-11-10-33-51-587.png! !image-2024-07-11-10-35-10-358.png! !image-2024-07-11-10-34-15-910.png! was: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. There ara some jpg of my policy and the result of commands !image-2024-07-11-10-33-51-587.png! !image-2024-07-11-10-35-10-358.png! !image-2024-07-11-10-34-15-910.png! > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png > > > *env* > java:jdk1.8.0_271 > os:centos 7 > kafka: 2.41 and 2.8.2 > ranger:2.1.0 > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. > > There ara some jpg of my policy and the result of commands > > !image-2024-07-11-10-33-51-587.png! > > !image-2024-07-11-10-35-10-358.png! > > !image-2024-07-11-10-34-15-910.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Attachment: image-2024-07-11-10-35-10-358.png > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. > > There ara some jpg of my policy and the result of commands > > !image-2024-07-11-10-33-51-587.png! > > > > !image-2024-07-11-10-34-15-910.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Description: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. There ara some jpg of my policy and the result of commands !image-2024-07-11-10-33-51-587.png! !image-2024-07-11-10-35-10-358.png! !image-2024-07-11-10-34-15-910.png! was: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. There ara some jpg of my policy and the result of commands !image-2024-07-11-10-33-51-587.png! !image-2024-07-11-10-34-15-910.png! > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png, image-2024-07-11-10-35-10-358.png > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. > > There ara some jpg of my policy and the result of commands > > !image-2024-07-11-10-33-51-587.png! > > !image-2024-07-11-10-35-10-358.png! > > !image-2024-07-11-10-34-15-910.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Attachment: image-2024-07-11-10-33-51-587.png > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Attachment: image-2024-07-11-10-34-15-910.png > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
[ https://issues.apache.org/jira/browse/KAFKA-17119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] StarBoy1005 updated KAFKA-17119: Description: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. There ara some jpg of my policy and the result of commands !image-2024-07-11-10-33-51-587.png! !image-2024-07-11-10-34-15-910.png! was: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > > > Key: KAFKA-17119 > URL: https://issues.apache.org/jira/browse/KAFKA-17119 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.4.0 >Reporter: StarBoy1005 >Priority: Major > Attachments: image-2024-07-11-10-33-51-587.png, > image-2024-07-11-10-34-15-910.png > > > After enabled kafka-ranger-plugin and baned the user for using describe in > policy, but that user still can use describe. > What's more, not even describe, but list. event the command of create topic > is abnormal. > > There ara some jpg of my policy and the result of commands > > !image-2024-07-11-10-33-51-587.png! > > > > !image-2024-07-11-10-34-15-910.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17119) After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe.
StarBoy1005 created KAFKA-17119: --- Summary: After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. Key: KAFKA-17119 URL: https://issues.apache.org/jira/browse/KAFKA-17119 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 2.4.0 Reporter: StarBoy1005 After enabled kafka-ranger-plugin and baned the user for using describe in policy, but that user still can use describe. What's more, not even describe, but list. event the command of create topic is abnormal. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16855) KRaft - Wire replaying a TopicRecord
[ https://issues.apache.org/jira/browse/KAFKA-16855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864871#comment-17864871 ] Luke Chen commented on KAFKA-16855: --- [~muralibasani] , the [PR|https://github.com/apache/kafka/pull/16502] for KAFKA-16853 is opened and got first round of review. I think it's in a good shape and you can start to work on this ticket based on that branch when you have time. FYI. > KRaft - Wire replaying a TopicRecord > > > Key: KAFKA-16855 > URL: https://issues.apache.org/jira/browse/KAFKA-16855 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Muralidhar Basani >Priority: Major > > *Summary* > Replaying a TopicRecord containing a new TieredEpoch and TieredState needs to > interact with the two thread pools in the RemoteLogManager to add/remove the > correct tasks from each -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang
[ https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-17115: -- Description: When a dynamic consumer (i.e., one with no group instance ID configured) first tries to join a group, the group coordinator normally responds with the MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon after. During this step, the group coordinator will also generate a new member ID for the consumer, include it in the error response for the initial join group request, and expect that a member with that ID will participate in future rebalances. If a consumer is closed in between the time that it sends the JoinGroup request and the time that it receives the response from the group coordinator, it will not attempt to leave the group, since it doesn't have a member ID to include in that request. This will cause future rebalances to hang, since the group coordinator will still expect a member with the ID for the now-closed consumer to join. Eventually, the group coordinator may remove the closed consumer from the group, but with default configuration settings, this could take as long as five minutes. One possible fix is to send a LeaveGroup response with the member ID if the consumer receives a JoinGroup response with a member ID after it has been closed. This ticket applies only to the legacy consumer. There is a similar issue with the new consumer that is tracked separately in KAFKA-17116. was: When a dynamic consumer (i.e., one with no group instance ID configured) first tries to join a group, the group coordinator normally responds with the MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon after. During this step, the group coordinator will also generate a new member ID for the consumer, include it in the error response for the initial join group request, and expect that a member with that ID will participate in future rebalances. If a consumer is closed in between the time that it sends the JoinGroup request and the time that it receives the response from the group coordinator, it will not attempt to leave the group, since it doesn't have a member ID to include in that request. This will cause future rebalances to hang, since the group coordinator will still expect a member with the ID for the now-closed consumer to join. Eventually, the group coordinator may remove the closed consumer from the group, but with default configuration settings, this could take as long as five minutes. One possible fix is to send a LeaveGroup response with the member ID if the consumer receives a JoinGroup response with a member ID after it has been closed. This applies to the legacy consumer; I have not verified yet with the new async consumer. > Closing newly-created consumers during rebalance can cause rebalances to hang > - > > Key: KAFKA-17115 > URL: https://issues.apache.org/jira/browse/KAFKA-17115 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.9.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > When a dynamic consumer (i.e., one with no group instance ID configured) > first tries to join a group, the group coordinator normally responds with the > MEMBER_ID_REQUIRED error, under the assumption that the member will retry > soon after. During this step, the group coordinator will also generate a new > member ID for the consumer, include it in the error response for the initial > join group request, and expect that a member with that ID will participate in > future rebalances. > If a consumer is closed in between the time that it sends the JoinGroup > request and the time that it receives the response from the group > coordinator, it will not attempt to leave the group, since it doesn't have a > member ID to include in that request. > This will cause future rebalances to hang, since the group coordinator will > still expect a member with the ID for the now-closed consumer to join. > Eventually, the group coordinator may remove the closed consumer from the > group, but with default configuration settings, this could take as long as > five minutes. > One possible fix is to send a LeaveGroup response with the member ID if the > consumer receives a JoinGroup response with a member ID after it has been > closed. > This ticket applies only to the legacy consumer. There is a similar issue > with the new consumer that is tracked separately in KAFKA-17116. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17115) Closing newly-created legacy consumers during rebalance can cause rebalances to hang
[ https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-17115: -- Summary: Closing newly-created legacy consumers during rebalance can cause rebalances to hang (was: Closing newly-created consumers during rebalance can cause rebalances to hang) > Closing newly-created legacy consumers during rebalance can cause rebalances > to hang > > > Key: KAFKA-17115 > URL: https://issues.apache.org/jira/browse/KAFKA-17115 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.9.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > When a dynamic consumer (i.e., one with no group instance ID configured) > first tries to join a group, the group coordinator normally responds with the > MEMBER_ID_REQUIRED error, under the assumption that the member will retry > soon after. During this step, the group coordinator will also generate a new > member ID for the consumer, include it in the error response for the initial > join group request, and expect that a member with that ID will participate in > future rebalances. > If a consumer is closed in between the time that it sends the JoinGroup > request and the time that it receives the response from the group > coordinator, it will not attempt to leave the group, since it doesn't have a > member ID to include in that request. > This will cause future rebalances to hang, since the group coordinator will > still expect a member with the ID for the now-closed consumer to join. > Eventually, the group coordinator may remove the closed consumer from the > group, but with default configuration settings, this could take as long as > five minutes. > One possible fix is to send a LeaveGroup response with the member ID if the > consumer receives a JoinGroup response with a member ID after it has been > closed. > This ticket applies only to the legacy consumer. There is a similar issue > with the new consumer that is tracked separately in KAFKA-17116. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang
[ https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864870#comment-17864870 ] Chris Egerton commented on KAFKA-17115: --- Damn, that was fast! Thanks for the follow-up [~lianetm], glad this helped with development of the new consumer. > Closing newly-created consumers during rebalance can cause rebalances to hang > - > > Key: KAFKA-17115 > URL: https://issues.apache.org/jira/browse/KAFKA-17115 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.9.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > When a dynamic consumer (i.e., one with no group instance ID configured) > first tries to join a group, the group coordinator normally responds with the > MEMBER_ID_REQUIRED error, under the assumption that the member will retry > soon after. During this step, the group coordinator will also generate a new > member ID for the consumer, include it in the error response for the initial > join group request, and expect that a member with that ID will participate in > future rebalances. > If a consumer is closed in between the time that it sends the JoinGroup > request and the time that it receives the response from the group > coordinator, it will not attempt to leave the group, since it doesn't have a > member ID to include in that request. > This will cause future rebalances to hang, since the group coordinator will > still expect a member with the ID for the now-closed consumer to join. > Eventually, the group coordinator may remove the closed consumer from the > group, but with default configuration settings, this could take as long as > five minutes. > One possible fix is to send a LeaveGroup response with the member ID if the > consumer receives a JoinGroup response with a member ID after it has been > closed. > > This applies to the legacy consumer; I have not verified yet with the new > async consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics
[ https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mutu updated KAFKA-16412: - Affects Version/s: 3.6.0 > Uncreated topics is considered as created topics > > > Key: KAFKA-16412 > URL: https://issues.apache.org/jira/browse/KAFKA-16412 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2, 3.6.0 >Reporter: mutu >Priority: Major > Attachments: AnotherClientOutput.txt, client1-3.6.png, > client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, > system1-3.6.log, system2-3.6.log, system3-3.6.log > > > A client sends topic creation request to broker. > Another client sends the same topic creation request to broker. > The former request does not finish. However, the second client get > TopicExistsException. > The root cause may be that topic is registered in zookeeper, but the data is > not persisted and topic is not transfer to the paritition. At this time, > another client sends the same topic creation request that check the status of > zookeeper. After finding the znode of topic, the creation failed. > System logs are attached. > Are there any comments to figure out this issues? I will very appreciate them. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16412) Uncreated topics is considered as created topics
[ https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864869#comment-17864869 ] mutu commented on KAFKA-16412: -- [~chia7712] hi, we have added system logs in 3.6.0. Could you take a look? > Uncreated topics is considered as created topics > > > Key: KAFKA-16412 > URL: https://issues.apache.org/jira/browse/KAFKA-16412 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2 >Reporter: mutu >Priority: Major > Attachments: AnotherClientOutput.txt, client1-3.6.png, > client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, > system1-3.6.log, system2-3.6.log, system3-3.6.log > > > A client sends topic creation request to broker. > Another client sends the same topic creation request to broker. > The former request does not finish. However, the second client get > TopicExistsException. > The root cause may be that topic is registered in zookeeper, but the data is > not persisted and topic is not transfer to the paritition. At this time, > another client sends the same topic creation request that check the status of > zookeeper. After finding the znode of topic, the creation failed. > System logs are attached. > Are there any comments to figure out this issues? I will very appreciate them. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state
[ https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ao Li updated KAFKA-17112: -- Component/s: unit tests > StreamThread shutdown calls completeShutdown only in CREATED state > -- > > Key: KAFKA-17112 > URL: https://issues.apache.org/jira/browse/KAFKA-17112 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.9.0 >Reporter: Ao Li >Priority: Minor > > While running tests in `StreamThreadTest.java` in kafka/streams, I noticed > the test left many lingering threads. Though the class runs `shutdown` after > each test, the shutdown only executes `completeShutdown` if the StreamThread > is in CREATED state. See > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231] > and > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435] > > For example, you may run test > org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending > with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls > `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, > `completeShutdown` is not called. The test creates three lingering threads: 2 > `StateUpdater` and 1 `TaskExecutor` > > This means that calls to `thread.shutdown` has no effect in > `StreamThreadTest.java`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15595) Session window aggregate drops records headers
[ https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15595: --- Assignee: (was: Hao Li) > Session window aggregate drops records headers > -- > > Key: KAFKA-15595 > URL: https://issues.apache.org/jira/browse/KAFKA-15595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Abdullah alkhawatrah >Priority: Major > > Hey, > While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow > aggregate behaviour, it seems now that custom headers added before the > aggregate are dropped. > I could reproduce the behaviour with the following test topology: > {code:java} > // code placeholder > final StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(EARLIEST)) > .process(() -> new Processor() { > private ProcessorContext context; > @Override > public void init(final ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > record.headers().add("key1", > record.value().toString().getBytes()); > context.forward(record); > } > }) > .groupByKey() > > .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), > Duration.ofDays(1L))) > .aggregate(() -> 1, > (key, value, aggregate) -> aggregate, > (aggKey, aggOne, aggTwo) -> aggTwo) > .toStream() > .map((key, value) -> new KeyValue<>(key.key(), value)) > .to(outputTopic); {code} > Checking evens in the `outputTopic` show that the headers are empty. With > 3.2.* the same topology would have propagated the headers. > > I can see here: > [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205] > that now a new record is created ignoring the headers, while in 3.2.2, the > same record was forwarded after changing the key and value while keeping the > headers: > [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics
[ https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-13183: - > Dropping nul key/value records upstream to repartiton topic not tracked via > metrics > --- > > Key: KAFKA-13183 > URL: https://issues.apache.org/jira/browse/KAFKA-13183 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > For joins and aggregation, we consider records with null key or value as > invalid, and drop them. Inside the aggregate and join processors, we record > dropped record with a corresponding metric (cf `droppedRecrodSensor`). > However, we also apply an upstream optimization if we need to repartition > data. As we know that the downstream aggregation / join will drop those > records anyway, we drop them _before_ we write them into the repartition > topic (we still need the drop logic in the processor for the case we don't > have a repartition topic). > We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) > upstream but this filter does not update the corresponding metric to record > dropped records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics
[ https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-13183: --- Assignee: (was: Alex Sorokoumov) > Dropping nul key/value records upstream to repartiton topic not tracked via > metrics > --- > > Key: KAFKA-13183 > URL: https://issues.apache.org/jira/browse/KAFKA-13183 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > For joins and aggregation, we consider records with null key or value as > invalid, and drop them. Inside the aggregate and join processors, we record > dropped record with a corresponding metric (cf `droppedRecrodSensor`). > However, we also apply an upstream optimization if we need to repartition > data. As we know that the downstream aggregation / join will drop those > records anyway, we drop them _before_ we write them into the repartition > topic (we still need the drop logic in the processor for the case we don't > have a repartition topic). > We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) > upstream but this filter does not update the corresponding metric to record > dropped records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics
[ https://issues.apache.org/jira/browse/KAFKA-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-13183. - Resolution: Fixed > Dropping nul key/value records upstream to repartiton topic not tracked via > metrics > --- > > Key: KAFKA-13183 > URL: https://issues.apache.org/jira/browse/KAFKA-13183 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > For joins and aggregation, we consider records with null key or value as > invalid, and drop them. Inside the aggregate and join processors, we record > dropped record with a corresponding metric (cf `droppedRecrodSensor`). > However, we also apply an upstream optimization if we need to repartition > data. As we know that the downstream aggregation / join will drop those > records anyway, we drop them _before_ we write them into the repartition > topic (we still need the drop logic in the processor for the case we don't > have a repartition topic). > We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) > upstream but this filter does not update the corresponding metric to record > dropped records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics
[ https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mutu updated KAFKA-16412: - Description: A client sends topic creation request to broker. Another client sends the same topic creation request to broker. The former request does not finish. However, the second client get TopicExistsException. The root cause may be that topic is registered in zookeeper, but the data is not persisted and topic is not transfer to the paritition. At this time, another client sends the same topic creation request that check the status of zookeeper. After finding the znode of topic, the creation failed. System logs are attached. Are there any comments to figure out this issues? I will very appreciate them. was: A client sends topic creation request to broker. Another client sends the same topic creation request to broker. The former request does not finish. However, the second client get TopicExistsException. The root cause may be that topic is registered in zookeeper, but the data is not persisted and topic is not transfer to the paritition. At this time, another client sends the same topic creation request that check the status of zookeeper. After finding the znode of topic, the creation failed. > Uncreated topics is considered as created topics > > > Key: KAFKA-16412 > URL: https://issues.apache.org/jira/browse/KAFKA-16412 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2 >Reporter: mutu >Priority: Major > Attachments: AnotherClientOutput.txt, client1-3.6.png, > client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, > system1-3.6.log, system2-3.6.log, system3-3.6.log > > > A client sends topic creation request to broker. > Another client sends the same topic creation request to broker. > The former request does not finish. However, the second client get > TopicExistsException. > The root cause may be that topic is registered in zookeeper, but the data is > not persisted and topic is not transfer to the paritition. At this time, > another client sends the same topic creation request that check the status of > zookeeper. After finding the znode of topic, the creation failed. > System logs are attached. > Are there any comments to figure out this issues? I will very appreciate them. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics
[ https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mutu updated KAFKA-16412: - Description: A client sends topic creation request to broker. Another client sends the same topic creation request to broker. The former request does not finish. However, the second client get TopicExistsException. The root cause may be that topic is registered in zookeeper, but the data is not persisted and topic is not transfer to the paritition. At this time, another client sends the same topic creation request that check the status of zookeeper. After finding the znode of topic, the creation failed. was: A client sends topic creation request to broker. Another client sends the same topic creation request to broker. The former request does not finish. However, the second client get TopicExistsException. > Uncreated topics is considered as created topics > > > Key: KAFKA-16412 > URL: https://issues.apache.org/jira/browse/KAFKA-16412 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2 >Reporter: mutu >Priority: Major > Attachments: AnotherClientOutput.txt, client1-3.6.png, > client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, > system1-3.6.log, system2-3.6.log, system3-3.6.log > > > A client sends topic creation request to broker. > Another client sends the same topic creation request to broker. > The former request does not finish. However, the second client get > TopicExistsException. > The root cause may be that topic is registered in zookeeper, but the data is > not persisted and topic is not transfer to the paritition. At this time, > another client sends the same topic creation request that check the status of > zookeeper. After finding the znode of topic, the creation failed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16412) Uncreated topics is considered as created topics
[ https://issues.apache.org/jira/browse/KAFKA-16412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mutu updated KAFKA-16412: - Attachment: client1-3.6.png client2-3.6.png system1-3.6.log system2-3.6.log system3-3.6.log > Uncreated topics is considered as created topics > > > Key: KAFKA-16412 > URL: https://issues.apache.org/jira/browse/KAFKA-16412 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.2 >Reporter: mutu >Priority: Major > Attachments: AnotherClientOutput.txt, client1-3.6.png, > client2-3.6.png, kafkaServer1.out, kafkaServer2.out, kafkaServer3.out, > system1-3.6.log, system2-3.6.log, system3-3.6.log > > > A client sends topic creation request to broker. > Another client sends the same topic creation request to broker. > The former request does not finish. However, the second client get > TopicExistsException. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17055) Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest
[ https://issues.apache.org/jira/browse/KAFKA-17055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864849#comment-17864849 ] José Armando García Sancio commented on KAFKA-17055: [~masonyc] interesting. Jira says that the user cannot be assigned issues. Let's leave it unassigned I guess. > Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest > - > > Key: KAFKA-17055 > URL: https://issues.apache.org/jira/browse/KAFKA-17055 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest use > well known ids like 0, 1, etc. Because of this those tests were not able to > catch a bug in the BeginQuorumEpoch schema were the default value for VoterId > was 0 instead of -1. > Improve those tests by using random positive numbers to lower the probability > that they will match the default value of a schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17118) Remove StorageTool#buildMetadataProperties
[ https://issues.apache.org/jira/browse/KAFKA-17118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864847#comment-17864847 ] kangning.li commented on KAFKA-17118: - Hey [~chia7712] , I'd Like to look into this. Can you assign this to me? > Remove StorageTool#buildMetadataProperties > -- > > Key: KAFKA-17118 > URL: https://issues.apache.org/jira/browse/KAFKA-17118 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > It is useless in production after > https://github.com/apache/kafka/commit/7060c08d6f9b0408e7f40a90499caf2e636fac61 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]
jolshan commented on code in PR #16183: URL: https://github.com/apache/kafka/pull/16183#discussion_r1673242703 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -217,8 +217,11 @@ public enum MetadataVersion { // Add ELR related supports (KIP-966). IBP_3_9_IV1(22, "3.9", "IV1", true), +IBP_3_9_IV2(23, "3.9", "IV2", false), Review Comment: Sounds good. I also need to make a few other changes when I rebase, so be on the lookout for more soon :) I planned to make the update where we don't send features as part of the registration/api versions if the version range is 0-0. I need to do this because Colin's fix only changes the min version. It is safe because a version range 0-0 is essentially the same as not supporting the feature. We will have it here until the MV is marked as production ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
[ https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-17114: Component/s: unit tests > DefaultStateUpdater::handleRuntimeException should update isRunning before > calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks` > - > > Key: KAFKA-17114 > URL: https://issues.apache.org/jira/browse/KAFKA-17114 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.9.0 >Reporter: Ao Li >Priority: Minor > > I saw a flaky test in > DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId > recently. > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} > To make the bug more reproducible, you may add `Thread.sleep(5)` after > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > in DefaultStateUpdater::handleRuntimeException > The test is flaky because > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` > statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. > If `assertFalse(stateUpdater.isRunning());` is executed before > `isRunning.set(false);` the test will fail -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17113) Flaky Test in GlobalStreamThreadTest#shouldThrowStreamsExceptionOnStartupIfExceptionOccurred
[ https://issues.apache.org/jira/browse/KAFKA-17113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-17113: Component/s: unit tests > Flaky Test in > GlobalStreamThreadTest#shouldThrowStreamsExceptionOnStartupIfExceptionOccurred > > > Key: KAFKA-17113 > URL: https://issues.apache.org/jira/browse/KAFKA-17113 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.9.0 >Reporter: Ao Li >Priority: Minor > > The `shouldThrowStreamsExceptionOnStartupIfExceptionOccurred` test expects ` > globalStreamThread.start` throws `startupException` when startup fails. This > may not be true in some slow machines. > > {code:java} > class GlobalStreamThread { > Exception startupException; > void initialize() { > try { > ... > } catch (Exception e) { > startupException = e; > } > ... > setState(State.DEAD); > } > void start() { > super.start(); > while (stillInitializing()) { > Utils.sleep(1); > if (startupException != null) { > throw startupexception; > } > } > if (inErrorState()) { > throw new IllegalStateException("Initialization for the global stream > thread failed"); > } > } > } > {code} > Consider the following schedule: > {code} > main:start:`startupException != null` > GlobalStreamThread:initialize:`startupException = e;` > GlobalStreamThread:initialize:`setState(State.DEAD);` > main:start:`inErrorState()` > main:start:`throw new IllegalStateException` > {code} > > The function throws `IllegalStateException("Initialization for the global > stream thread failed")` instead of `startupexception` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864841#comment-17864841 ] TengYao Chi commented on KAFKA-17116: - Hi [~lianetm] I would like to investigate this issue, may i have this one? Many thanks! > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.9.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1673195870 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: I will check again and try to remove . Thanks for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1673195870 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: I will check again. Thanks for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1673194862 ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -706,4 +706,9 @@ public static ApiVersionsResponse createApiVersionsResponse( ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, zkMigrationEnabled); } + +public static String getCurrentFunctionName() { Review Comment: Sorry, I missed the newset commend. Will do ASAP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17055) Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest
[ https://issues.apache.org/jira/browse/KAFKA-17055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864838#comment-17864838 ] Mason C commented on KAFKA-17055: - Hi [~jsancio] , could you please assign it to me? Seems like I don't have access to assign to myself. > Use random replica ids in KafkaRaftClientTest and KafkaRaftClientSnapshotTest > - > > Key: KAFKA-17055 > URL: https://issues.apache.org/jira/browse/KAFKA-17055 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest use > well known ids like 0, 1, etc. Because of this those tests were not able to > catch a bug in the BeginQuorumEpoch schema were the default value for VoterId > was 0 instead of -1. > Improve those tests by using random positive numbers to lower the probability > that they will match the default value of a schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]
junrao commented on code in PR #16183: URL: https://github.com/apache/kafka/pull/16183#discussion_r1673162688 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -217,8 +217,11 @@ public enum MetadataVersion { // Add ELR related supports (KIP-966). IBP_3_9_IV1(22, "3.9", "IV1", true), +IBP_3_9_IV2(23, "3.9", "IV2", false), Review Comment: Could we add a comment that this is for bootstrapping TV? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16260: Deprecate window.size.ms in StreamsConfig.java and index.html [kafka]
mjsax commented on code in PR #16391: URL: https://github.com/apache/kafka/pull/16391#discussion_r1673180484 ## docs/streams/developer-guide/config-streams.html: ## @@ -689,7 +689,7 @@ default.value.serde - default.windowed.key.serde.inner (Deprecated.) + default.windowed.key.serde.inner(Deprecated.) Review Comment: Why remove the space? ## docs/streams/developer-guide/config-streams.html: ## @@ -243,19 +243,19 @@ num.standby.replicas - acceptable.recovery.lag + acceptable.recovery.lag Review Comment: Why do we change this? Seem the top row with "Parameter Name" is `odd` already, so `even` seems to be correct here? ## docs/streams/developer-guide/config-streams.html: ## @@ -1039,7 +1045,7 @@ topology.optimizationdefault.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +windowed.inner.class.serde(Deprecated.) Review Comment: ```suggestion windowed.inner.class.serde (Deprecated.) ``` ## docs/streams/developer-guide/config-streams.html: ## @@ -713,6 +713,12 @@ default.windowed.value.serde.innerThis is discussed in more detail in Data types and serialization. + +windowed.inner.class.serde(Deprecated.) + + The default Serializer/Deserializer class for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client. Review Comment: ```suggestion The default Serializer/Deserializer class for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]
masonyc commented on PR #16563: URL: https://github.com/apache/kafka/pull/16563#issuecomment-2221709715 > Thanks for the changes @masonyc. > > Can you confirm that some of these tests fail if you remove `"default": "-1",` from these two schemas? > > 1. https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/VoteRequest.json#L27 > 2. https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json#L27 > > While verifying the above, can you also confirm that the test failures provide enough information to reproduce the failure. I am concerned that since the replica ids are random the test failures won't provide enough information to know which replica id value cause the failure. > > Let's make similar changes to `KafkaRaftClientSnapshotTest`. Hi @jsancio , thanks for the review. Confirmed that I can see test failures when removing the -1 flag in both schemas. I have added the Leader Id passed from test case and Leader Id from Raft Response into the assertion message block so it can be used to reproduce the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]
masonyc commented on code in PR #16563: URL: https://github.com/apache/kafka/pull/16563#discussion_r1673181139 ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -3854,4 +3859,13 @@ private static ReplicaKey replicaKey(int id, boolean withDirectoryId) { Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID; return ReplicaKey.of(id, directoryId); } + +private static Integer getRandomPort() { +int minPort = 1024; +int mockAddressPrefix = 9990; +// Number of nodes we can set up if we got a random number that is maximum in the range +int reservedNumberOfPorts = 50; +int maxPort = 65535 - mockAddressPrefix - reservedNumberOfPorts; +return ThreadLocalRandom.current().nextInt((maxPort - minPort) + 1) + minPort; +} Review Comment: renamed and removed the minPort so it can generate from 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]
masonyc commented on code in PR #16563: URL: https://github.com/apache/kafka/pull/16563#discussion_r1673180531 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -1233,7 +1238,7 @@ static void verifyLeaderChangeMessage( LeaderChangeMessage leaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage(recordValue); assertEquals(leaderId, leaderChangeMessage.leaderId()); assertEquals(voters.stream().map(voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toList()), -leaderChangeMessage.voters()); + leaderChangeMessage.voters().stream().sorted(Comparator.comparingInt(Voter::voterId)).collect(Collectors.toList())); Review Comment: thanks for the feedback, we still need to collect to List, otherwise the assertion will fail. Updated format in latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17055: Change RaftClientTestContext.java nodes to use random positive number id [kafka]
masonyc commented on code in PR #16563: URL: https://github.com/apache/kafka/pull/16563#discussion_r1673179696 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -576,6 +577,10 @@ void assertSentDescribeQuorumResponse( partitionData, nodes ); + +List sortedVoters = response.topics().get(0).partitions().get(0).currentVoters().stream().sorted(Comparator.comparingInt(ReplicaState::replicaId)).collect(Collectors.toList()); + response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters); Review Comment: thanks for the feedback, updated in latest commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]
jolshan commented on PR #16183: URL: https://github.com/apache/kafka/pull/16183#issuecomment-2221692444 I think I also need to fix the case where max version is 0 as well. 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators [kafka]
jolshan commented on PR #16183: URL: https://github.com/apache/kafka/pull/16183#issuecomment-2221688829 Yeehaw time to rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17011) SupportedFeatures.MinVersion incorrectly blocks v0
[ https://issues.apache.org/jira/browse/KAFKA-17011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-17011. Resolution: Fixed > SupportedFeatures.MinVersion incorrectly blocks v0 > -- > > Key: KAFKA-17011 > URL: https://issues.apache.org/jira/browse/KAFKA-17011 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Critical > Fix For: 3.9.0 > > > SupportedFeatures.MinVersion incorrectly blocks v0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]
jolshan merged PR #16421: URL: https://github.com/apache/kafka/pull/16421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16959: ConfigCommand should allow to define both `entity-default` and `entity-name` [kafka]
chia7712 commented on code in PR #16381: URL: https://github.com/apache/kafka/pull/16381#discussion_r1673154305 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -354,128 +354,150 @@ object ConfigCommand extends Logging { } } - @nowarn("cat=deprecation") def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { val entityTypes = opts.entityTypes val entityNames = opts.entityNames val entityTypeHead = entityTypes.head -val entityNameHead = entityNames.head val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) } val configsToBeDeleted = parseConfigsToBeDeleted(opts) entityTypeHead match { case ConfigType.TOPIC => -val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false) - .map { entry => (entry.name, entry) }.toMap +alterTopicConfig(adminClient, entityTypeHead, entityNames, configsToBeAdded, configsToBeDeleted) + case ConfigType.BROKER => +alterBrokerConfig(adminClient, entityTypeHead, entityNames, configsToBeAdded, configsToBeDeleted) + case BrokerLoggerConfigType => +alterBrokerLoggingConfig(adminClient, entityTypeHead, entityNames, configsToBeAdded, configsToBeDeleted) + case ConfigType.USER | ConfigType.CLIENT => +alterUserOrClientConfig(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeAdded, configsToBeDeleted) + case ConfigType.IP => +alterIpConfig(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeAdded, configsToBeDeleted) + case ConfigType.CLIENT_METRICS => +alterClientMetricsConfig(adminClient, entityTypeHead, entityNames, configsToBeAdded, configsToBeDeleted) + case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead") +} -// fail the command if any of the configs to be deleted does not exist -val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) -if (invalidConfigs.nonEmpty) - throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") +if (entityNames.nonEmpty) { + entityNames.foreach( +entityName => println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityName.") + ) +} else + println(s"Completed updating default config for $entityTypeHead in the cluster.") + } -val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead) -val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) -val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) - ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } -).asJavaCollection -adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + private def alterTopicConfig(adminClient: Admin, entityTypeHead: String, entityNames: List[String], configsToBeAdded: Predef.Map[String, ConfigEntry], configsToBeDeleted: Seq[String]) = { +entityNames.foreach { entityName => + getOldConfig(adminClient, entityTypeHead, configsToBeDeleted, entityName) +} +val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) +val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) + ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } + ).asJavaCollection +adminClient.incrementalAlterConfigs(entityNames.map(new ConfigResource(ConfigResource.Type.TOPIC, _)) +.map(_ -> alterEntries).toMap.asJava, alterOptions) + .all().get(60, TimeUnit.SECONDS) + } - case ConfigType.BROKER => -val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false) - .map { entry => (entry.name, entry) }.toMap + private def getOldConfig(adminClient: Admin, entityTypeHead: String, configsToBeDeleted: Seq[String], entityName: String): Map[String, ConfigEntry] = { +val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityName, includeSynonyms = false, describeAll = false) + .map { entry => (entry.name, entry) }.toMap -// fail the command if any of the configs to be deleted does not exist -val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) -if (invalidConfigs.nonEmpty) - throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") +// fail the command if any of the configs to b
Re: [PR] KAFKA-17011: Fix a bug preventing features from supporting v0 [kafka]
jolshan commented on PR #16421: URL: https://github.com/apache/kafka/pull/16421#issuecomment-2221672097 Test failures are unrelated. I will merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]
chia7712 commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1673151480 ## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ## @@ -191,7 +191,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) { controllerNode.metadataDirectory()); } props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, -"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); Review Comment: Do we still need those changes? ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -706,4 +706,9 @@ public static ApiVersionsResponse createApiVersionsResponse( ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, zkMigrationEnabled); } + +public static String getCurrentFunctionName() { Review Comment: IIRC, we had a discussion about the topic name. What is the updates? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17118) Remove StorageTool#buildMetadataProperties
Chia-Ping Tsai created KAFKA-17118: -- Summary: Remove StorageTool#buildMetadataProperties Key: KAFKA-17118 URL: https://issues.apache.org/jira/browse/KAFKA-17118 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai It is useless in production after https://github.com/apache/kafka/commit/7060c08d6f9b0408e7f40a90499caf2e636fac61 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17102: FetchRequest#forgottenTopics would return incorrect data [kafka]
chia7712 commented on code in PR #16557: URL: https://github.com/apache/kafka/pull/16557#discussion_r1673143180 ## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ## @@ -243,4 +245,44 @@ public void testPartitionDataEquals() { new FetchRequest.PartitionData(Uuid.randomUuid(), 300, 0L, 300, Optional.of(300))); } +@Test +public void testFetchRequestNoCacheData() { +short version = 13; +Uuid topicId = Uuid.randomUuid(); +int partition = 0; +TopicIdPartition tp = new TopicIdPartition(topicId, partition, "topic"); + +Map partitionData = Collections.singletonMap(tp.topicPartition(), +new FetchRequest.PartitionData(topicId, 0, 0, 0, Optional.empty())); +List toReplace = Collections.singletonList(tp); + +FetchRequest fetchRequest = FetchRequest.Builder +.forReplica(version, 0, 1, 1, 1, partitionData) +.removed(Collections.emptyList()) +.replaced(toReplace) +.metadata(FetchMetadata.newIncremental(123)).build(version); + +HashMap topicNames = new HashMap<>(); +topicNames.put(topicId, tp.topic()); + +List requestsWithTopicsName = fetchRequest.forgottenTopics(topicNames); Review Comment: `Collections.singletonMap(topicId, tp.topic())` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16863: Fix Version/s: 4.0.0 > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Muralidhar Basani >Priority: Trivial > Labels: kip > Fix For: 4.0.0 > > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. > KIP-1056: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16863: Labels: kip (was: need-kip) > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Muralidhar Basani >Priority: Trivial > Labels: kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16863: Description: Kafka Streams has a set of configs with `default.` prefix. The intent for the default-prefix is to make a distinction between, well the default, and in-place overwrites in the code. Eg, users can specify ts-extractors on a per-topic basis. However, for the deserialization- and production-exception handlers, no such overwrites are possible, and thus, `default.` does not really make sense, because there is just one handler overall. Via KIP-1033 we added a new processing-exception handler w/o a default-prefix, too. Thus, we should consider to deprecate the two existing configs names and add them back w/o the `default.` prefix. KIP-1056: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig] was: Kafka Streams has a set of configs with `default.` prefix. The intent for the default-prefix is to make a distinction between, well the default, and in-place overwrites in the code. Eg, users can specify ts-extractors on a per-topic basis. However, for the deserialization- and production-exception handlers, no such overwrites are possible, and thus, `default.` does not really make sense, because there is just one handler overall. Via KIP-1033 we added a new processing-exception handler w/o a default-prefix, too. Thus, we should consider to deprecate the two existing configs names and add them back w/o the `default.` prefix. > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Muralidhar Basani >Priority: Trivial > Labels: kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. > KIP-1056: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]
jeffkbkim commented on code in PR #16511: URL: https://github.com/apache/kafka/pull/16511#discussion_r1673128140 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -993,13 +998,50 @@ public void testCleanupGroupMetadata() { verify(groupMetadataManager, times(0)).maybeDeleteGroup(eq("other-group-id"), any()); } +@Test +public void testCleanupGroupMetadataWhenAppendFails() { +GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); +OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); +Time mockTime = new MockTime(); +MockCoordinatorTimer timer = new MockCoordinatorTimer<>(mockTime); +GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); +GroupCoordinatorShard coordinator = new GroupCoordinatorShard( +new LogContext(), +groupMetadataManager, +offsetMetadataManager, +mockTime, +timer, +mock(GroupCoordinatorConfig.class), +mock(CoordinatorMetrics.class), +metricsShard +); + +when(groupMetadataManager.groupIds()).thenReturn(mkSet("group-id", "other-group-id")); +when(offsetMetadataManager.cleanupExpiredOffsets(eq("group-id"), eq(new ArrayList<>(.thenReturn(true); +when(groupMetadataManager.maybeDeleteGroup(eq("group-id"), eq(new ArrayList<>(.thenReturn(true); Review Comment: the test above, testCleanupGroupMetadata (https://github.com/apache/kafka/pull/16511/files/b73af9c786d4ad29259d0bfeb7c16db3324eff4b#diff-3a0b9cad0253e0f6d4665efd0d6f7efd5bd5dd96d3ba31005cab06fa728aad8fR990) tests that the records we add are reflected. would this be sufficient? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16106: revert classic state transitions if deletion fails [kafka]
jeffkbkim commented on code in PR #16511: URL: https://github.com/apache/kafka/pull/16511#discussion_r1673126130 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -575,18 +581,32 @@ public CoordinatorResult deleteOffs public CoordinatorResult cleanupGroupMetadata() { long startMs = time.milliseconds(); List records = new ArrayList<>(); +AtomicInteger deletedClassicGroupCount = new AtomicInteger(0); Review Comment: If you mean by using it to conform to the lambda expression, yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add clean up command to tests README [kafka]
chia7712 commented on code in PR #16560: URL: https://github.com/apache/kafka/pull/16560#discussion_r1673108881 ## tests/README.md: ## @@ -47,6 +47,10 @@ TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeT ``` bash tests/docker/ducker-ak up -j 'openjdk:11'; tests/docker/run_tests.sh ``` +* Clean Docker Image +``` +docker rm -f $(docker ps -aq --filter "name=ducker") Review Comment: Have you considered using `./tests/docker/ducker-ak down -f` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864822#comment-17864822 ] Eduwer Camacaro edited comment on KAFKA-17098 at 7/10/24 10:02 PM: --- I've recently uploaded a fresh log ([^server.log.txt]) file with all of the log entries from the instance's start until the exception. was (Author: JIRAUSER306126): I've recently uploaded a fresh log ([^server0.log.txt]) file with all of the log entries from the instance's start until the exception. > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server.log.txt, server0.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduwer Camacaro updated KAFKA-17098: Attachment: (was: server0.log.txt) > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduwer Camacaro updated KAFKA-17098: Attachment: server.log.txt > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server.log.txt, server0.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864822#comment-17864822 ] Eduwer Camacaro commented on KAFKA-17098: - I've recently uploaded a fresh log ([^server0.log.txt]) file with all of the log entries from the instance's start until the exception. > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server0.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduwer Camacaro updated KAFKA-17098: Attachment: (was: server.log) > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server0.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eduwer Camacaro updated KAFKA-17098: Attachment: server0.log.txt > Error Opening RocksDBStore > -- > > Key: KAFKA-17098 > URL: https://issues.apache.org/jira/browse/KAFKA-17098 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Eduwer Camacaro >Priority: Minor > Attachments: server0.log.txt > > > I'm getting this exception on many of my state stores: > > {code:java} > 03:35:33 [1;31mERROR[m [LH] KafkaStreamsServerImpl - Uncaught exception > for Error opening store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 > [1;31mERROR[m [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered > the following exception during processing and the registered exception > handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down > now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store core-repartition-store at location > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[server.jar:?] at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > ~[server.jar:?] at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current > process, acquire time 1720323060 acquiring thread 139820785784576: > /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks > available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at > org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ~[server.jar:?] ... 19 more {code} > > Some other details: > - State updater is enabled > - I'm using 5 stream threads > - This usually happends during either normal processing or during state > store restoration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread
[ https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-17066: - Assignee: Kirk True > New consumer updateFetchPositions should perform all operations in background > thread > > > Key: KAFKA-17066 > URL: https://issues.apache.org/jira/browse/KAFKA-17066 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > > The updateFetchPositions func in the new consumer performs several actions > based on the assigned partitions from the subscriptionState. The way it's > currently implemented, it fetches committed offsets for partitions that > required a position (retrieved from subscription state in the app thread), > and then resets positions for the partitions still needing one (retrieved > from the subscription state but in the backgroud thread). > This is problematic, given that the assignment/subscriptionState may change > in the background thread at any time (ex. new partitions reconciled), so we > could end up resetting positions to the partition offsets for a partition for > which we never evetn attempted to retrieve committed offsets. > This sequence for a consumer that owns a partitions tp0,: > * consumer owns tp0 > * app thread -> updateFetchPositions triggers > initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned > partitions requiring a position (taking them from > subscriptions.initializingPartitions()). This will fetch committed offsets > for tp0 only. > * background thread -> receives new partition tp1 and completes > reconciliation (adds it to the subscription state as INITIALIZING, requires a > position) > * app thread -> updateFetchPositions resets positions for all partitions > that still don't have a valid position after initWithCommittedOffsetsIfNeeded > (taking them from subscriptionState.partitionsNeedingReset). This will > mistakenly consider that it should reset tp1 to the partition offsets, when > in reality it never even tried fetching the committed offsets for it because > it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. > We should consider moving the updateFetchPositions as a single event to the > background, that would safely use the subscriptionState object and apply all > actions involved in the updateFetchPositions to the same consistent set of > partitions assigned at that moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16984) New consumer should not complete leave operation until it gets a response
[ https://issues.apache.org/jira/browse/KAFKA-16984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16984: --- Description: When the new consumer attempts to leave a group, it sends a leave group request in a fire-and-forget mode, so, as soon as the request is generated, it will: 1. transitions to UNSUBSCRIBED 2. complete the leaveGroup operation future This task focus on point 2, which has the undesired side-effect that whatever might have been waiting for the leave to do something else, will carry on, ex. consumer close, leading to responses to disconnected clients we've seen when running stress tests) When leaving a group while closing a consumer, the member sends the leave request and moves on to next operation, which is closing the network thread, so we end up with disconnected client receiving responses from the server. We should send leave group heartbeat, and transition to UNSUBSCRIBE, but only complete the leave operation when we get a response for it, which is a much more accurate confirmation that the consumer left the group and can move on with other operations. Note that the legacy consumer does wait for a leave response before closing down the coordinator (see [AbstractCoordinator|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1135-L1140]), we we are looking to have the same behaviour on the new consumer. Note that with this task we'll only focus on changing the behaviour for the leave operation completion (point 2 above) to tidy up the close flow. We are not changing the transition to UNSUBSCRIBED, as it would require further consideration if ever needed. This is also a building block for future improvements around error handling for the leave request, which we don't have at the moment (related Jira linked) was: When the new consumer attempts to leave a group, it sends a leave group request in a fire-and-forget mode, so, as soon as the request is generated, it will: 1. transitions to UNSUBSCRIBED 2. complete the leaveGroup operation future This task focus on point 2, which has the undesired side-effect that whatever might have been waiting for the leave to do something else, will carry on, ex. consumer close, leading to responses to disconnected clients we've seen when running stress tests) When leaving a group while closing a consumer, the member sends the leave request and moves on to next operation, which is closing the network thread, so we end up with disconnected client receiving responses from the server. We should send leave group heartbeat, and transition to UNSUBSCRIBE, but only complete the leave operation when we get a response for it, which is a much more accurate confirmation that the consumer left the group and can move on with other operations. Note that the legacy consumer does wait for a leave response before closing down the coordinator (see [AbstractCoordinator|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1135-L1140]), we we are looking to have the same behaviour on the new consumer. Note that with this task we'll only focus on changing the behaviour for the leave operation completion (point 2 above) to tidy up the close flow. We are not changing the transition to UNSUBSCRIBED, as it would require further consideration if ever needed. > New consumer should not complete leave operation until it gets a response > - > > Key: KAFKA-16984 > URL: https://issues.apache.org/jira/browse/KAFKA-16984 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848 > Fix For: 3.9.0 > > > When the new consumer attempts to leave a group, it sends a leave group > request in a fire-and-forget mode, so, as soon as the request is generated, > it will: > 1. transitions to UNSUBSCRIBED > 2. complete the leaveGroup operation future > This task focus on point 2, which has the undesired side-effect that whatever > might have been waiting for the leave to do something else, will carry on, > ex. consumer close, leading to responses to disconnected clients we've seen > when running stress tests) > When leaving a group while closing a consumer, the member sends the leave > request and moves on to next operation, which is closing the network thread, > so we end up with disconnected client receiving responses from the server. We > should send leave group heartbeat, and transition to UNSUBSC
[PR] KAFKA-16984: Complete consumer leave on response to leave request [kafka]
lianetm opened a new pull request, #16569: URL: https://github.com/apache/kafka/pull/16569 Improvement to ensure that, even though the leave group request is sent out once, the leave group operation is considered complete only when it receives a response to the HB to leave (successful or failed). The motivation is to avoid undesired interactions with operations triggered after the unsubscribe (ex. consumer close triggers a leave group operation, and shuts down the network thread when it completes, which before this PR could lead to responses to disconnected clients). This is the same behaviour on close that the legacy consumer has (waits for leave responses). Note that this PR does not change the transitions of the state machine on leave group, only the completion of the leave group future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17117) Avoid instantiating classpath plugins when service loading plugins
Greg Harris created KAFKA-17117: --- Summary: Avoid instantiating classpath plugins when service loading plugins Key: KAFKA-17117 URL: https://issues.apache.org/jira/browse/KAFKA-17117 Project: Kafka Issue Type: Improvement Components: connect Affects Versions: 3.6.0 Reporter: Greg Harris In KAFKA-14789 modifications were made to allow PluginClassLoaders to see all resource files of the parent DelegatingClassLoader and classpath, rather than selectively hiding some resources that were for ServiceLoader manifests. This has the effect that the ServiceLoader finds classpath plugins when searching in plugin locations, and the PluginScanner filters these plugins out by checking for classloader equality. This has some side-effects that are undesirable: * Classpath plugins may be instantiated with the thread context classloader set to a plugin classloader * Classpath plugins are instantiated multiple times, once for each plugin location * Exceptions from classpath plugins show up multiple times in the logs: KAFKA-17111 This change may require us to fork the ServiceLoader implementation, which is itself undesirable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-17116: --- Summary: New consumer may not send effective leave group if member ID received after close (was: New consumer may not send leave group if member ID received after close ) > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.9.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17116) New consumer may not send leave group if member ID received after close
Lianet Magrans created KAFKA-17116: -- Summary: New consumer may not send leave group if member ID received after close Key: KAFKA-17116 URL: https://issues.apache.org/jira/browse/KAFKA-17116 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.8.0 Reporter: Lianet Magrans Fix For: 3.9.0 If the new consumer is closed after sending a HB to join, but before receiving the response to it, it will send a leave group request but without member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the broker will have a registered new member, for which it will never receive a leave request for it. # consumer.subscribe -> sends HB to join, transitions to JOINING # consumer.close -> will transition to LEAVING and send HB with epoch -1 (without waiting for in-flight requests) # consumer receives response to initial HB, containing the assigned member ID. It will simply ignore it because it's not in the group anymore (UNSUBSCRIBED) Note that the expectation, with the current logic, and main downsides of this are: # If the case was that the member received partitions on the first HB, those partitions won't be re-assigned (broker waiting for the closed consumer to reconcile them), until the rebalance timeout expires. # Even if no partitions were assigned to it, the member will remain in the group from the broker point of view (but not from the client POV). The member will be eventually kicked out for not sending HBs, but only when it's session timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Don't swallow validateReconfiguration exceptions [kafka]
ahuang98 commented on code in PR #16346: URL: https://github.com/apache/kafka/pull/16346#discussion_r1672981769 ## clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java: ## @@ -190,9 +191,13 @@ public Set reconfigurableConfigs() { } @Override -public void validateReconfiguration(Map configs) { +public void validateReconfiguration(Map configs) throws ConfigException { if (this.securityProtocol == SecurityProtocol.SASL_SSL) -sslFactory.validateReconfiguration(configs); +try { +sslFactory.validateReconfiguration(configs); +} catch (IllegalStateException e) { +throw new ConfigException("SASL reconfiguration failed due to " + e); Review Comment: I'm concatenating the "SASL reconfiguration failed" message with the underlying exception string now since it looks like `ConfigException` will print errors in a very specific way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang
[ https://issues.apache.org/jira/browse/KAFKA-17115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864812#comment-17864812 ] Lianet Magrans commented on KAFKA-17115: Hey [~ChrisEgerton] , even though on the new consumer we don't have this same flow (no member ID required to join), I believe we would have the same gap you pointed out: # new consumer joins -> sends HB with epoch 0 (no member ID) # consumer closed -> this will still generate the HB to leave (epoch -1), but with no member ID because it does not have one yet, so not able to be processed correctly by the broker anyways # consumer receives response to initial HB to join (response containing member ID) So yes, we would end up in a similar situation (less bad only because with the new protocol and no global barrier, we wouldn't have a blocked rebalance, just a member that is registered in the group and may have received partitions that won't be re-assigned until the rebalance timeout expires and the broker gives the partitions to someone else. The member would be kicked out of the group when its session expires. I will file a separate Jira to review and fix this edge case with the new consumer. Thanks! > Closing newly-created consumers during rebalance can cause rebalances to hang > - > > Key: KAFKA-17115 > URL: https://issues.apache.org/jira/browse/KAFKA-17115 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.9.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > When a dynamic consumer (i.e., one with no group instance ID configured) > first tries to join a group, the group coordinator normally responds with the > MEMBER_ID_REQUIRED error, under the assumption that the member will retry > soon after. During this step, the group coordinator will also generate a new > member ID for the consumer, include it in the error response for the initial > join group request, and expect that a member with that ID will participate in > future rebalances. > If a consumer is closed in between the time that it sends the JoinGroup > request and the time that it receives the response from the group > coordinator, it will not attempt to leave the group, since it doesn't have a > member ID to include in that request. > This will cause future rebalances to hang, since the group coordinator will > still expect a member with the ID for the now-closed consumer to join. > Eventually, the group coordinator may remove the closed consumer from the > group, but with default configuration settings, this could take as long as > five minutes. > One possible fix is to send a LeaveGroup response with the member ID if the > consumer receives a JoinGroup response with a member ID after it has been > closed. > > This applies to the legacy consumer; I have not verified yet with the new > async consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17111: Revert "KAFKA-15996: Improve JsonConverter performance (#14992)" [kafka]
gharris1727 opened a new pull request, #16568: URL: https://github.com/apache/kafka/pull/16568 This reverts commit 314de9f23c7e8d574eb3c03e345f8cf504266831. The original change causes ERROR logs to be printed on worker startup, and should be reverted in advance of the 3.8.0 release. This is an alternative to #16565. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]
gharris1727 commented on PR #16565: URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221423742 And for anyone reviewing this, it's easy to verify the change by checking the behavior of the following commands: ``` ./gradlew jar ./bin/connect-plugin-path.sh list --plugin-location ~/.m2/repository/com/fasterxml/jackson ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]
gharris1727 commented on PR #16565: URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221414024 cc @divijvaidya and @mimaison who reviewed the original performance PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add logs when metadata update is not successful [kafka]
jolshan commented on PR #16496: URL: https://github.com/apache/kafka/pull/16496#issuecomment-2221397981 cc: @C0urante I know we had some discussion on this issue with connect. Seems like this won't fix the issue but could provide logs to indicate when it is happening. (We likely will still need to do a KIP if we want to handle this case differently) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Await consumer group membership before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest [kafka]
C0urante commented on PR #16519: URL: https://github.com/apache/kafka/pull/16519#issuecomment-2221387274 It seems like there's a real consumer bug that's been causing this flakiness: https://issues.apache.org/jira/browse/KAFKA-17115 Closing this PR in favor of the bug fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Await consumer group membership before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest [kafka]
C0urante closed pull request #16519: MINOR: Await consumer group membership before verifying/modifying sink connector offsets in OffsetsApiIntegrationTest URL: https://github.com/apache/kafka/pull/16519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17111: Explicitly Register Afterburner Module in JsonSerializer and JsonDeserializer [kafka]
gharris1727 commented on PR #16565: URL: https://github.com/apache/kafka/pull/16565#issuecomment-2221387172 Thanks @vbalani002 for the bug report and prompt fix! I agree with this approach, let me know when you want me to review this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery
[ https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-17111: Priority: Blocker (was: Major) > ServiceConfigurationError in JsonSerializer/Deserializer during Plugin > Discovery > > > Key: KAFKA-17111 > URL: https://issues.apache.org/jira/browse/KAFKA-17111 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.8.0 >Reporter: Vikas Balani >Assignee: Vikas Balani >Priority: Blocker > > h3. Problem: > JsonSerializer and JsonDeserializer use > objectMapper.findAndRegisterModules(), which attempts to register all Jackson > modules implementing com.fasterxml.jackson.databind.Module. This can cause a > ServiceConfigurationError when incompatible modules are present in the > classpath. > > {code:java} > java.util.ServiceConfigurationError: > org.apache.kafka.connect.storage.Converter: Provider > org.apache.kafka.connect.json.JsonConverter could not be instantiated > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813) > at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729) > at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136) > at > org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67) > at > org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78) > at > org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128) > at > org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113) > Caused by: java.util.ServiceConfigurationError: > com.fasterxml.jackson.databind.Module: > com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273) > at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309) > at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142) > at > com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192) > at > org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238) > at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789) > ... 13 more{code} > > h3. Steps to Reproduce: > 1. Start a connect worker with Service loading enabled and with certain > connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink) > 2. Observe ServiceConfigurationError during plugin discovery > h3. Current Behavior: > ServiceConfigurationError is thrown with message > "com.fasterxml.jackson.databind.Module: not a subtype" > Where can be one of: - > * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule > * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule > * com.fasterxml.jackson.datatype.guava.GuavaModule > * com.fasterxml.jackson.datatype.joda.JodaModule > h3. Proposed Solution: > Explicitly register the Afterburner module instead of using > findAndRegisterModules(). > h3. Potential Impact: > - Resolves compatibility issues with certain Jackson modules > - Maintains performance improvement
[jira] [Assigned] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery
[ https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-17111: --- Assignee: Vikas Balani (was: Greg Harris) > ServiceConfigurationError in JsonSerializer/Deserializer during Plugin > Discovery > > > Key: KAFKA-17111 > URL: https://issues.apache.org/jira/browse/KAFKA-17111 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.8.0 >Reporter: Vikas Balani >Assignee: Vikas Balani >Priority: Major > > h3. Problem: > JsonSerializer and JsonDeserializer use > objectMapper.findAndRegisterModules(), which attempts to register all Jackson > modules implementing com.fasterxml.jackson.databind.Module. This can cause a > ServiceConfigurationError when incompatible modules are present in the > classpath. > > {code:java} > java.util.ServiceConfigurationError: > org.apache.kafka.connect.storage.Converter: Provider > org.apache.kafka.connect.json.JsonConverter could not be instantiated > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813) > at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729) > at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136) > at > org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67) > at > org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78) > at > org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128) > at > org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113) > Caused by: java.util.ServiceConfigurationError: > com.fasterxml.jackson.databind.Module: > com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273) > at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309) > at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142) > at > com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192) > at > org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238) > at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789) > ... 13 more{code} > > h3. Steps to Reproduce: > 1. Start a connect worker with Service loading enabled and with certain > connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink) > 2. Observe ServiceConfigurationError during plugin discovery > h3. Current Behavior: > ServiceConfigurationError is thrown with message > "com.fasterxml.jackson.databind.Module: not a subtype" > Where can be one of: - > * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule > * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule > * com.fasterxml.jackson.datatype.guava.GuavaModule > * com.fasterxml.jackson.datatype.joda.JodaModule > h3. Proposed Solution: > Explicitly register the Afterburner module instead of using > findAndRegisterModules(). > h3. Potential Impact: > - Resolves compatibility issues with certain Jackson modules > - Maintains perform
[jira] [Commented] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery
[ https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864804#comment-17864804 ] Greg Harris commented on KAFKA-17111: - Hi [~vbalani] Thank you for the bug report! I can reproduce it locally. I believe that this should be a cosmetic error, as the error is thrown when the classpath JsonConverter is found via the each plugin.path. These later get excluded to avoid duplicates: [https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java#L144-L148] so even if the error was not thrown, these exceptions should be ignored. I tried to figure out a way to prevent plugin scanning from finding classpath plugins, i'll have to take another look at that to see if we can prevent this sort of behavior in the future. For now, I agree with your recommended course of action. I'll raise this in the release thread to see if we can include it in 3.8.0. > ServiceConfigurationError in JsonSerializer/Deserializer during Plugin > Discovery > > > Key: KAFKA-17111 > URL: https://issues.apache.org/jira/browse/KAFKA-17111 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.8.0 >Reporter: Vikas Balani >Assignee: Vikas Balani >Priority: Major > > h3. Problem: > JsonSerializer and JsonDeserializer use > objectMapper.findAndRegisterModules(), which attempts to register all Jackson > modules implementing com.fasterxml.jackson.databind.Module. This can cause a > ServiceConfigurationError when incompatible modules are present in the > classpath. > > {code:java} > java.util.ServiceConfigurationError: > org.apache.kafka.connect.storage.Converter: Provider > org.apache.kafka.connect.json.JsonConverter could not be instantiated > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813) > at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729) > at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136) > at > org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67) > at > org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78) > at > org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128) > at > org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113) > Caused by: java.util.ServiceConfigurationError: > com.fasterxml.jackson.databind.Module: > com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273) > at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309) > at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142) > at > com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192) > at > org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238) > at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789) > ... 13 more{code} > >
[jira] [Assigned] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery
[ https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-17111: --- Assignee: Greg Harris (was: Vikas Balani) > ServiceConfigurationError in JsonSerializer/Deserializer during Plugin > Discovery > > > Key: KAFKA-17111 > URL: https://issues.apache.org/jira/browse/KAFKA-17111 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.8.0 >Reporter: Vikas Balani >Assignee: Greg Harris >Priority: Major > > h3. Problem: > JsonSerializer and JsonDeserializer use > objectMapper.findAndRegisterModules(), which attempts to register all Jackson > modules implementing com.fasterxml.jackson.databind.Module. This can cause a > ServiceConfigurationError when incompatible modules are present in the > classpath. > > {code:java} > java.util.ServiceConfigurationError: > org.apache.kafka.connect.storage.Converter: Provider > org.apache.kafka.connect.json.JsonConverter could not be instantiated > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813) > at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729) > at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136) > at > org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79) > at > org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67) > at > org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90) > at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78) > at > org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128) > at > org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101) > at > org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113) > Caused by: java.util.ServiceConfigurationError: > com.fasterxml.jackson.databind.Module: > com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype > at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273) > at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309) > at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158) > at > com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142) > at > com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192) > at > org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250) > at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238) > at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) > at > java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789) > ... 13 more{code} > > h3. Steps to Reproduce: > 1. Start a connect worker with Service loading enabled and with certain > connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink) > 2. Observe ServiceConfigurationError during plugin discovery > h3. Current Behavior: > ServiceConfigurationError is thrown with message > "com.fasterxml.jackson.databind.Module: not a subtype" > Where can be one of: - > * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule > * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule > * com.fasterxml.jackson.datatype.guava.GuavaModule > * com.fasterxml.jackson.datatype.joda.JodaModule > h3. Proposed Solution: > Explicitly register the Afterburner module instead of using > findAndRegisterModules(). > h3. Potential Impact: > - Resolves compatibility issues with certain Jackson modules > - Maintains performa
[jira] [Updated] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
[ https://issues.apache.org/jira/browse/KAFKA-15954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15954: --- Description: Currently the legacy and new consumer follows a minimal effort approach when sending a leave group (legacy) or last heartbeat request (new consumer). The request is sent without waiting/handling any response. This behaviour applies when the consumer is being closed or when it unsubscribes. For the case when the consumer is being closed, (which is a "terminal" state), it probably makes sense to just follow a minimal effort approach for "properly" leaving the group (no retry logic). But for the case of unsubscribe, we could consider if valuable to to put a little more effort into making sure that the last heartbeat is sent and received by the broker (ex. what if coordinator not known/available when sending the last HB). Note that unsubscribe could a temporary state, where the consumer might want to re-join the group at any time. was: Currently the legacy and new consumer follows a minimal effort approach when sending a leave group (legacy) or last heartbeat request (new consumer). The request is sent without waiting/handling any response. This behaviour applies when the consumer is being closed or when it unsubscribes. For the case when the consumer is being closed, (which is a "terminal" state), it makes sense to just follow a minimal effort approach for "properly" leaving the group. But for the case of unsubscribe, it would maybe make sense to put a little more effort in making sure that the last heartbeat is sent and received by the broker. Note that unsubscribe could a temporary state, where the consumer might want to re-join the group at any time. > Review minimal effort approach on consumer last heartbeat on unsubscribe > > > Key: KAFKA-15954 > URL: https://issues.apache.org/jira/browse/KAFKA-15954 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, reconciliation > Fix For: 3.9.0 > > > Currently the legacy and new consumer follows a minimal effort approach when > sending a leave group (legacy) or last heartbeat request (new consumer). The > request is sent without waiting/handling any response. This behaviour applies > when the consumer is being closed or when it unsubscribes. > For the case when the consumer is being closed, (which is a "terminal" > state), it probably makes sense to just follow a minimal effort approach for > "properly" leaving the group (no retry logic). But for the case of > unsubscribe, we could consider if valuable to to put a little more effort > into making sure that the last heartbeat is sent and received by the broker > (ex. what if coordinator not known/available when sending the last HB). Note > that unsubscribe could a temporary state, where the consumer might want to > re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
[ https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ao Li updated KAFKA-17114: -- Affects Version/s: 3.9.0 > DefaultStateUpdater::handleRuntimeException should update isRunning before > calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks` > - > > Key: KAFKA-17114 > URL: https://issues.apache.org/jira/browse/KAFKA-17114 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.9.0 >Reporter: Ao Li >Priority: Minor > > I saw a flaky test in > DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId > recently. > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} > To make the bug more reproducible, you may add `Thread.sleep(5)` after > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > in DefaultStateUpdater::handleRuntimeException > The test is flaky because > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` > statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. > If `assertFalse(stateUpdater.isRunning());` is executed before > `isRunning.set(false);` the test will fail -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17115: Send LeaveGroup request if consumer receives JoinGroup response with member ID after being closed [kafka]
C0urante commented on PR #16567: URL: https://github.com/apache/kafka/pull/16567#issuecomment-2221307929 Waiting to see if CI likes this before marking ready for review. I was able to run the `AbstractCoordinatorTest` suite locally and verify that the clients module builds, but nothing else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17114) DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks`
[ https://issues.apache.org/jira/browse/KAFKA-17114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ao Li updated KAFKA-17114: -- Component/s: streams Description: I saw a flaky test in DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId recently. {code} org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31) at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231) at org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294) at org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) {code} To make the bug more reproducible, you may add `Thread.sleep(5)` after `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` in DefaultStateUpdater::handleRuntimeException The test is flaky because `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. If `assertFalse(stateUpdater.isRunning());` is executed before `isRunning.set(false);` the test will fail Priority: Minor (was: Major) Summary: DefaultStateUpdater::handleRuntimeException should update isRunning before calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks` (was: handleRuntimeException) > DefaultStateUpdater::handleRuntimeException should update isRunning before > calling `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks` > - > > Key: KAFKA-17114 > URL: https://issues.apache.org/jira/browse/KAFKA-17114 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ao Li >Priority: Minor > > I saw a flaky test in > DefaultStateUpdaterTest::shouldThrowIfAddingStandbyAndActiveTaskWithSameId > recently. > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:231) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingTasksWithSameId(DefaultStateUpdaterTest.java:294) > at > org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldThrowIfAddingStandbyAndActiveTaskWithSameId(DefaultStateUpdaterTest.java:285) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} > To make the bug more reproducible, you may add `Thread.sleep(5)` after > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > in DefaultStateUpdater::handleRuntimeException > The test is flaky because > `addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);` > will unblock the `verifyFailedTasks(IllegalStateException.class, task1);` > statement in DefaultStateUpdaterTest::shouldThrowIfAddingTasksWithSameId. > If `assertFalse(stateUpdater.isRunning());` is executed before > `isRunning.set(false);` the test will fail -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-17115: Send LeaveGroup request if consumer receives JoinGroup response with member ID after being closed [kafka]
C0urante opened a new pull request, #16567: URL: https://github.com/apache/kafka/pull/16567 [Jira](https://issues.apache.org/jira/browse/KAFKA-17115) This change implements a best-effort attempt to notify the group coordinator that the member ID of a closed consumer can be discarded, which can prevent rebalances from getting stuck if a dynamic consumer receives an initial JoinGroup response with the `MEMBER_ID_REQUIRED` error after it has been closed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang
Chris Egerton created KAFKA-17115: - Summary: Closing newly-created consumers during rebalance can cause rebalances to hang Key: KAFKA-17115 URL: https://issues.apache.org/jira/browse/KAFKA-17115 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.9.0 Reporter: Chris Egerton Assignee: Chris Egerton When a dynamic consumer (i.e., one with no group instance ID configured) first tries to join a group, the group coordinator normally responds with the MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon after. During this step, the group coordinator will also generate a new member ID for the consumer, include it in the error response for the initial join group request, and expect that a member with that ID will participate in future rebalances. If a consumer is closed in between the time that it sends the JoinGroup request and the time that it receives the response from the group coordinator, it will not attempt to leave the group, since it doesn't have a member ID to include in that request. This will cause future rebalances to hang, since the group coordinator will still expect a member with the ID for the now-closed consumer to join. Eventually, the group coordinator may remove the closed consumer from the group, but with default configuration settings, this could take as long as five minutes. One possible fix is to send a LeaveGroup response with the member ID if the consumer receives a JoinGroup response with a member ID after it has been closed. This applies to the legacy consumer; I have not verified yet with the new async consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17114) handleRuntimeException
Ao Li created KAFKA-17114: - Summary: handleRuntimeException Key: KAFKA-17114 URL: https://issues.apache.org/jira/browse/KAFKA-17114 Project: Kafka Issue Type: Bug Reporter: Ao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1672845320 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -698,6 +663,42 @@ object StorageTool extends Logging { voterSet } + def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { +val advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners +val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() +advertisedListenerEndpoints.foreach(endpoint => { + val host: String = endpoint.host + listeners.put(endpoint.listenerName, new InetSocketAddress(host, endpoint.port)) +}) +listeners + } + + private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], + metaProperties: MetaProperties, + config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { Review Comment: From the kip description "When the format command is executed with this option it will read the node.id configured in the properties file specified by the --config option and compare it against the specified in --controller-quorum-voters. If there is a match, it will write the specified to the directory.id property in the meta.properties for the metadata.log.dir directory." I tried adding the if condition if (metaProperties.nodeId().getAsInt == replicaId) ) in the method May be am wrong. Can you pls suggest code maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1672833818 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -112,20 +112,21 @@ object StorageTool extends Logging { setNodeId(config.nodeId). build() val standaloneMode = namespace.getBoolean("standalone") -var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = List() val controllersQuorumVoters = namespace.getString("controller_quorum_voters") if(standaloneMode && controllersQuorumVoters != null) { throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.") } +var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() if (standaloneMode) { - advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners + listeners = createStandaloneVoterMap(config) } else if(controllersQuorumVoters != null) { if (!validateControllerQuorumVoters(controllersQuorumVoters)) { throw new TerseFailure("Expected schema for --controller-quorum-voters is [-]@:") } - advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters)) + listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config) Review Comment: This means we would have to update https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java#L178 to return Uuid too ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17111) ServiceConfigurationError in JsonSerializer/Deserializer during Plugin Discovery
[ https://issues.apache.org/jira/browse/KAFKA-17111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-17111: Description: h3. Problem: JsonSerializer and JsonDeserializer use objectMapper.findAndRegisterModules(), which attempts to register all Jackson modules implementing com.fasterxml.jackson.databind.Module. This can cause a ServiceConfigurationError when incompatible modules are present in the classpath. {code:java} java.util.ServiceConfigurationError: org.apache.kafka.connect.storage.Converter: Provider org.apache.kafka.connect.json.JsonConverter could not be instantiated at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:813) at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:729) at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1403) at org.apache.kafka.connect.runtime.isolation.PluginScanner.handleLinkageError(PluginScanner.java:176) at org.apache.kafka.connect.runtime.isolation.PluginScanner.getServiceLoaderPluginDesc(PluginScanner.java:136) at org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner.scanPlugins(ServiceLoaderScanner.java:61) at org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:79) at org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:67) at org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:99) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:90) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:78) at org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:128) at org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:101) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:113) Caused by: java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: com.fasterxml.jackson.datatype.jsr310.JavaTimeModule not a subtype at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:593) at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1244) at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273) at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309) at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393) at com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1158) at com.fasterxml.jackson.databind.ObjectMapper.findModules(ObjectMapper.java:1142) at com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1192) at org.apache.kafka.connect.json.JsonSerializer.(JsonSerializer.java:58) at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:250) at org.apache.kafka.connect.json.JsonConverter.(JsonConverter.java:238) at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:789) ... 13 more{code} h3. Steps to Reproduce: 1. Start a connect worker with Service loading enabled and with certain connector plugins in plugin path (e.g. AzureBlobSource & BigQuerySink) 2. Observe ServiceConfigurationError during plugin discovery h3. Current Behavior: ServiceConfigurationError is thrown with message "com.fasterxml.jackson.databind.Module: not a subtype" Where can be one of: - * com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule * com.fasterxml.jackson.datatype.jsr310.JavaTimeModule * com.fasterxml.jackson.datatype.guava.GuavaModule * com.fasterxml.jackson.datatype.joda.JodaModule h3. Proposed Solution: Explicitly register the Afterburner module instead of using findAndRegisterModules(). h3. Potential Impact: - Resolves compatibility issues with certain Jackson modules - Maintains performance improvements from Afterburner module - May slightly change behavior for users relying on auto-registration of other Jackson modules was: h3. Problem: JsonSerializer and JsonDeserializer use objectMapper.findAndRegisterModules(), which attempts to register all Jackson modules implementing com.fasterxml.jackson.databind.Module. This can cause a ServiceConfigurationError when incompatible modules are present in the classpath. {code:java} java.util.ServiceConfigurationError: org.apache.kafka.connect.storage.Converter: Provider org.apache.kafka.connect.json.JsonConverter could not be instantiated at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:586) at java
[jira] [Commented] (KAFKA-17097) Add replace.null.with.default configuration to ValueToKey and ReplaceField (KIP-1040)
[ https://issues.apache.org/jira/browse/KAFKA-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864782#comment-17864782 ] Greg Harris commented on KAFKA-17097: - [~mimaison] Sorry, I was the one to close the previous issue, since the attached PR had a larger scope than the issue description and a smaller scope than the KIP. I wanted to avoid the risk that a single issue straddles a release, in case the follow-up PR is delayed. > Add replace.null.with.default configuration to ValueToKey and ReplaceField > (KIP-1040) > - > > Key: KAFKA-17097 > URL: https://issues.apache.org/jira/browse/KAFKA-17097 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Greg Harris >Assignee: PoAn Yang >Priority: Major > Labels: newbie > > {color:#172b4d}See > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677] > for motivation and design. > {color}These are the final remaining transformations which still need this > configuration added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [ KAFKA-17049 ] fix Incremental rebalances assign too many tasks for the same connector together [kafka]
gharris1727 commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -793,6 +796,46 @@ protected void assignConnectors(List workerAssignment, Collection implements Iterator { + +private final Map> grouped; +private final List keys; + +private int k; + +public BalancedIterator(Collection collection, Function allocationGrouper) { +this.k = 0; +this.grouped = collection.stream().collect(Collectors.groupingBy( +allocationGrouper, +Collectors.collectingAndThen( +Collectors.toList(), +List::iterator +) +)); +this.keys = collection.stream() +.map(allocationGrouper) +.distinct() +.collect(Collectors.toList()); Review Comment: Maybe I don't understand, but I don't think this changed anything. The incoming Collection may still have an over-representation of a single connector first, leading that connector to be preferentially revoked. For example, consider this situation ``` W1: C1 C2 C3 C4 W2: C1 C5 C6 C7 W3: C1 C8 C9 C10 ``` If a new worker joins, C1 could be revoked because it appears the same number of times as all of the other connectors, but that would violate local balance later: ``` W1: C2 C3 C4 W2: C5 C6 C7 W3: C8 C9 C10 W4: C1 C1 C1 ``` The BalancedIterator isn't fairly tie-breaking when two connectors have the same number of jobs assigned to the current worker. Picking a single job to revoke depends on the entire rest of the state, and some degree of predicting how the jobs will be distributed afterwards. This is what I think the "ideal" state should be after that initial state: ``` W1: C1 C2 C3 W2: C1 C5 C6 W3: C1 C8 C9 W4: C4 C7 C10 ``` At most one C1 should be revoked overall, because revoking two to put on W4 would break local balance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org