[jira] [Resolved] (KAFKA-14495) Improve the RemoteIndexCacheTest
[ https://issues.apache.org/jira/browse/KAFKA-14495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14495. --- Fix Version/s: 3.5.0 Resolution: Fixed > Improve the RemoteIndexCacheTest > > > Key: KAFKA-14495 > URL: https://issues.apache.org/jira/browse/KAFKA-14495 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.5.0 > > > https://github.com/apache/kafka/pull/11390/files#r1049392445 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14128) Kafka Streams terminates on topic check
[ https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14128. - Fix Version/s: 3.5.0 3.4.1 Resolution: Fixed > Kafka Streams terminates on topic check > --- > > Key: KAFKA-14128 > URL: https://issues.apache.org/jira/browse/KAFKA-14128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 > Environment: Any >Reporter: Patrik Kleindl >Assignee: Lucia Cerchie >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > Our streams application shut down unexpectedly after some network issues that > should have been easily recoverable. > Logs: > > {code:java} > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting > from node 3 due to request timeout. > 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] > org.apache.kafka.clients.NetworkClient : [AdminClient > clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled > in-flight METADATA request with correlation id 985 due to node 3 being > disconnected (elapsed time since creation: 60023ms, elapsed time since send: > 60023ms, request timeout: 3ms) > 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] > o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected > error during topic description for > L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog. > Error message was: org.apache.kafka.common.errors.TimeoutException: > Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, > nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) > 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > I think the relevant code is in > [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550] > {code:java} > topicFuture.getValue().get();{code} > without a timeout value cannot throw a TimeoutException, so the > TimeoutException of the AdminClient will be an ExecutionException and hit the > last else branch where the StreamsException is thrown. > Possible fix: > Use the KafkaFuture method with timeout: > {code:java} > public abstract T get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, > TimeoutException;{code} > instead of > {code:java} > public abstract T get() throws InterruptedException, ExecutionException;{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on pull request #13161: Kafka 14128
mjsax commented on PR #13161: URL: https://github.com/apache/kafka/pull/13161#issuecomment-1441323418 Thanks for the PR. Merged to `trunk` and cherry-picked to `3.4` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13279: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded
showuon commented on code in PR #13279: URL: https://github.com/apache/kafka/pull/13279#discussion_r1115304004 ## core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala: ## @@ -221,6 +222,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { } else { assertEquals(Errors.UNSUPPORTED_VERSION, error(partitionWithDownConversionDisabled)) } +TestUtils.waitUntilTrue(() => TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > initialFetchMessageConversionsPerSec, +s"init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 3000) Review Comment: nit: The error message could be more clear, ex: ``` The `FetchMessageConversionsPerSec` metric count is not incremented after 3 seconds. init: $initialFetchMessageConversionsPerSec final: ${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)} ``` Also, the 3 seconds might be able to increase to 5 sec to avoid flaky failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13161: Kafka 14128
mjsax merged PR #13161: URL: https://github.com/apache/kafka/pull/13161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13290: MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch
chia7712 commented on PR #13290: URL: https://github.com/apache/kafka/pull/13290#issuecomment-1441274910 > But why does this PR have no CI test running? maybe jenkins is on vacation. merge trunk to trigger QA again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1441268058 @ijuma changes are integrated (please remember that https://github.com/apache/kafka/pull/13263 needs to be merged prior to this one). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
dajac commented on PR #13231: URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441262764 > Took a quick look at the unstable api change. Looks like some integration tests built specifically for v4 fail with `org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled` > > I will need to look into this. @jolshan I suppose that you have to enable unstable apis in your new integration tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1441214746 Thanks Michael. I added a happy path testcase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14739) Kafka consumer reading messages out of order after a rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692452#comment-17692452 ] Luke Chen commented on KAFKA-14739: --- [~colinshaw] , Have you tried to run with the latest version of kafka client? We did some improvement to it before. Please check if this issue still existed. Thanks. > Kafka consumer reading messages out of order after a rebalance > -- > > Key: KAFKA-14739 > URL: https://issues.apache.org/jira/browse/KAFKA-14739 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Colin Shaw >Priority: Major > Attachments: image-2023-02-21-16-45-10-464.png > > > We are seeing often when our consumers are running and a rebalance is > triggered, some partitions do not resume at the correct offset causing a > large number of events to be skipped. > Our understanding is that within a partition events should also be read in > chronological order. > > These consumers run once a day and we see that when they resume again the > next day it does return to the missed events. > > An example we investigated: > {quote}We see for partition 88 starting processing around Offset 14001. > Events were processed until Offset 14059 but we could see Offset 14060 was > not processed from out logs. > Upon investigation we could see a rebalance had occured after offset 14059 > and afterwards the next event processed was offset 14190 causing about 130 > events to be skipped on this partition. > The subsequent day we it did resume at Offset 14060 > {quote} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee closed pull request #13289: [TESTING] (DNR)testing failing test from another PR
philipnee closed pull request #13289: [TESTING] (DNR)testing failing test from another PR URL: https://github.com/apache/kafka/pull/13289 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13281: [MINOR] Adjust logging with ZK log format
showuon merged PR #13281: URL: https://github.com/apache/kafka/pull/13281 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size
showuon merged PR #13261: URL: https://github.com/apache/kafka/pull/13261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size
showuon commented on PR #13261: URL: https://github.com/apache/kafka/pull/13261#issuecomment-1441133511 Failed tests are unrelated ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() Build / JDK 8 and Scala 2.12 / kafka.admin.ReassignPartitionsIntegrationTest.testReassignment(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin() Build / JDK 17 and Scala 2.13 / kafka.server.KafkaServerKRaftRegistrationTest.[1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size
bachmanity1 commented on PR #13261: URL: https://github.com/apache/kafka/pull/13261#issuecomment-1441126265 The CI test failures look unrelated to this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-5863: -- Assignee: Greg Harris > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ted Yu >Assignee: Greg Harris >Priority: Minor > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #13294: KAFKA-5863: Avoid NPE when calls expecting no-content receive content.
gharris1727 opened a new pull request, #13294: URL: https://github.com/apache/kafka/pull/13294 The RestClient accepts a TypeReference argument defining what kind of response to expect from the HTTP request. If these request is expected to result in a 204 no-content, they would previously not provide a TypeReference argument (null). If a request which normally resulted in a 204 returned some other 200 code, then the RestClient would experience an NPE when it tries to deserialize the nonempty response. Instead, we should enforce that the TypeReference is always provided to the RestClient. If we provide a TypeReference for the calls which do not expect to return content, then if they do receive content, that content will be silently dropped instead of causing an NPE. Adds new tests for enforcing non-null arguments, and confirms the behavior of a TypeReference call-site receiving a non-empty response. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
RivenSun2 commented on PR #13270: URL: https://github.com/apache/kafka/pull/13270#issuecomment-1441096550 Hi @guozhangwang could you give any suggestions? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on PR #13231: URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441095454 Took a quick look at the unstable api change. Looks like some integration tests built specifically for v4 fail with `org.apache.kafka.common.errors.InvalidRequestException: Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled` I will need to look into this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #13293: KAFKA-14365: Extract common logic from Fetcher into FetcherUtils
kirktrue opened a new pull request, #13293: URL: https://github.com/apache/kafka/pull/13293 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
ijuma commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1441082213 @dejan2609 Good catch. I fixed it in the same branch. And tested with Scala 2.12 and 2.13. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on PR #13231: URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441071519 I've added the changes to the API spec -- verify only is now a transaction level config -- top level error is added to the response I've added builders to the request and tried to simplify some of the methods. I've also addressed the verifyOnly case where some partitions are in the txn and others are not. I will update the KIP to reflect some of these changes (especially with respect to the API spec) I still need to address the unstable API change, but that will require a pull from master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-5827) Allow configuring Kafka sink connectors to start processing records from the end of topics
[ https://issues.apache.org/jira/browse/KAFKA-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-5827. Resolution: Duplicate > Allow configuring Kafka sink connectors to start processing records from the > end of topics > -- > > Key: KAFKA-5827 > URL: https://issues.apache.org/jira/browse/KAFKA-5827 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Behrang Saeedzadeh >Priority: Major > > As far as I can see, Kafka connectors start exporting data of a topic from > the beginning of its partitions. We have a topic that contains a few million > old records that we don't need but we would like to start exporting new > records that are added to the topic. > Basically: > * When the connector is started for the first time and it does not have a > current offset stored, it should start consuming data from the end of topic > partitions > * When the connector is restarted and has a current offset for partitions > stored somewhere, it should start from those offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-5827) Allow configuring Kafka sink connectors to start processing records from the end of topics
[ https://issues.apache.org/jira/browse/KAFKA-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692413#comment-17692413 ] Greg Harris commented on KAFKA-5827: This is controllable via the Client Override feature KIP-458 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] . You can configure the `consumer.override.auto.offset.reset` configuration property in a connector configuration to have the consumer begin reading from the latest record in a partition. After the connector commits offsets, further restarts will pick up where the previous commit finished, avoiding data loss while not re-reading previously committed messages. > Allow configuring Kafka sink connectors to start processing records from the > end of topics > -- > > Key: KAFKA-5827 > URL: https://issues.apache.org/jira/browse/KAFKA-5827 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Behrang Saeedzadeh >Priority: Major > > As far as I can see, Kafka connectors start exporting data of a topic from > the beginning of its partitions. We have a topic that contains a few million > old records that we don't need but we would like to start exporting new > records that are added to the topic. > Basically: > * When the connector is started for the first time and it does not have a > current offset stored, it should start consuming data from the end of topic > partitions > * When the connector is restarted and has a current offset for partitions > stored somewhere, it should start from those offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia opened a new pull request, #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores
vcrfxia opened a new pull request, #13292: URL: https://github.com/apache/kafka/pull/13292 (This PR is stacked on https://github.com/apache/kafka/pull/13274. Only the last commit needs to be reviewed separately.) This PR sets the correct topic configs for changelog topics for versioned stores introduced in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores). Changelog topics for versioned stores differ from those for non-versioned stores only in that `min.compaction.lag.ms` needs to be set in order to prevent version history from being compacted prematurely. The value for `min.compaction.lag.ms` is equal to the store's history retention plus some buffer to account for the broker's use of wall-clock time in performing compactions. This buffer is analogous to the `windowstore.changelog.additional.retention.ms` value for window store changelog topic retention time, and uses the same default of 24 hours. In the future, we can propose a KIP to expose a config such as `versionedstore.changelog.additional.compaction.lag.ms` to allow users to tune this value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values
kowshik commented on PR #13268: URL: https://github.com/apache/kafka/pull/13268#issuecomment-1440982486 @junrao Thanks for the review. I have rebased the PR and brought in the latest commits from AK trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1115068319 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { +return Collections.unmodifiableSet(subscriptions.assignedPartitions()); +} + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set subscription() { +
[jira] [Resolved] (KAFKA-4006) Kafka connect fails sometime with InvalidTopicException in distributed mode
[ https://issues.apache.org/jira/browse/KAFKA-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-4006. Fix Version/s: 0.11.0.0 Resolution: Fixed > Kafka connect fails sometime with InvalidTopicException in distributed mode > --- > > Key: KAFKA-4006 > URL: https://issues.apache.org/jira/browse/KAFKA-4006 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Sumit Arrawatia >Priority: Major > Fix For: 0.11.0.0 > > > I get trying to spin up a 3 node distributed connect cluster. > Sometimes one of the worker fails to boot with the following error when auto > topic creation is enabled : > org.apache.kafka.common.errors.InvalidTopicException: Topic 'default.config' > is invalid > default.config is the topic name for Connect config. > Also, starting the worker again fixes the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1115056524 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { Review Comment: Oh my mistake, entirely forgot about there's no rebalancing after the assignment(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[GitHub] [kafka] guozhangwang commented on pull request #13265: Prototype consumer stubs
guozhangwang commented on PR #13265: URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440933102 Thanks @philipnee , I left a comment to just give my 2c regarding your question 1) above. For your question 2), I think we should prefer to be consistent with the current guarantees if it's already stated in the public APIs unless there's good rationale to change it (in which case we'd need very vocally communicate it as a breaking change in the release). For example: 1) We documented that rebalance listener callbacks are only related to `subscribe` scenarios, and would only be triggered within `poll()`, and only at which time the assignment could be changed. 2) We also documented that the async commit listener callbacks are only triggered within `poll()`. 3) In our javadocs, the CommitFailedException would only be thrown from the `commitSync` functions. Putting all those together, I think it means `poll()` call should drain the queues since if there's any events requiring any callbacks to be triggered, they should be triggered in that call; For `commitSync`, technically it would need to wait for the corresponding commit response event, but since there's only a single queue, it means we would still need to keep polling that queue until the event is received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs
guozhangwang commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1115047665 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { Review Comment: In the `assign()` -> `assignment()` example above, there is no "rebalance" triggered and hence there's no rebalance listener triggered either. The rebalance listener is only relevant with the `subscribe()` scenarios. So just to summarize all the
[jira] [Comment Edited] (KAFKA-14722) Make BooleanSerde public
[ https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692398#comment-17692398 ] Spacrocket edited comment on KAFKA-14722 at 2/22/23 10:32 PM: -- Thanks Matthias for an advice; I've added the proposal yesterday and it's in [DISCUSS] stage. I think I will wait till Friday and then start the VOTING stage. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface] was (Author: JIRAUSER289157): Thanks Matthias J. Sax for an advice; I've added the proposal yesterday and it's in [DISCUSS] stage. I think I will wait till Friday and then start the VOTING stage. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface] > Make BooleanSerde public > > > Key: KAFKA-14722 > URL: https://issues.apache.org/jira/browse/KAFKA-14722 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Spacrocket >Priority: Minor > Labels: beginner, need-kip, newbie > > We introduce a "BooleanSerde" via > [https://github.com/apache/kafka/pull/13249] as internal class. We could make > it public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14722) Make BooleanSerde public
[ https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692398#comment-17692398 ] Spacrocket commented on KAFKA-14722: Thanks Matthias J. Sax for an advice; I've added the proposal yesterday and it's in [DISCUSS] stage. I think I will wait till Friday and then start the VOTING stage. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface] > Make BooleanSerde public > > > Key: KAFKA-14722 > URL: https://issues.apache.org/jira/browse/KAFKA-14722 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Spacrocket >Priority: Minor > Labels: beginner, need-kip, newbie > > We introduce a "BooleanSerde" via > [https://github.com/apache/kafka/pull/13249] as internal class. We could make > it public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs
guozhangwang commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1115037535 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { +return Collections.unmodifiableSet(subscriptions.assignedPartitions()); +} + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set subscription() {
[jira] [Assigned] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks
[ https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-9228: -- Assignee: Greg Harris > Reconfigured converters and clients may not be propagated to connector tasks > > > Key: KAFKA-9228 > URL: https://issues.apache.org/jira/browse/KAFKA-9228 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2 >Reporter: Chris Egerton >Assignee: Greg Harris >Priority: Major > > If an existing connector is reconfigured but the only changes are to its > converters and/or Kafka clients (enabled as of > [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]), > the changes will not propagate to its tasks unless the connector also > generates task configs that differ from the existing task configs. Even after > this point, if the connector tasks are reconfigured, they will still not pick > up on the new converter and/or Kafka client configs. > This is because the {{DistributedHerder}} only writes new task configurations > to the connect config topic [if the connector-provided task configs differ > from the task configs already in the config > topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332], > and neither of those contain converter or Kafka client configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440891478 > @dejan2609 I added a few cleanups here: [ijuma@9c6ae57](https://github.com/ijuma/kafka/commit/9c6ae575276e673f39d188d42a332f9f0b07d2d0) > > If you agree with them, please integrate into your PR. @ijuma Scala 2.13 builds are fine, but something is still missing for Scala 2.12 (all **_./gradlew -PscalaVersion=2.12 clean jar_** builds are failling): - JDK 17 ``` > Task :core:compileScala '-release' does not accept multiple arguments bad option: '-release:8' > Task :core:compileScala FAILED > Task :streams:streams-scala:compileScala FAILED 'strict-unsealed-patmat' is not a valid choice for '-Xlint' '-release' does not accept multiple arguments bad option: '-release:8' FAILURE: Build completed with 2 failures. ``` - JDK 11 ``` > Task :core:compileScala FAILED > Task :streams:streams-scala:compileScala 'strict-unsealed-patmat' is not a valid choice for '-Xlint' '-release' does not accept multiple arguments bad option: '-release:8' > Task :streams:streams-scala:compileScala FAILED FAILURE: Build completed with 2 failures. ``` - JDK 8 ``` > Task :core:compileScala FAILED '-release' does not accept multiple arguments bad option: '-release:8' FAILURE: Build failed with an exception. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link Serde}s + * to convert from K,ValueAndTimestampV to Bytes,byte[]. In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( Review Comment: Let me try to repeat back your suggestion to make
[GitHub] [kafka] junrao merged pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes
junrao merged PR #13272: URL: https://github.com/apache/kafka/pull/13272 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440804226 Got it @ijuma I will also add jacoco minor version bump: 0.8.7 -->> 0.8.8 in order to use same version as in Gradle 8.0.1: https://docs.gradle.org/8.0.1/userguide/jacoco_plugin.html#sec:configuring_the_jacoco_plugin Will squash/rebase on trunk and force-push. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { Review Comment: I feel we need an explicit synchronization barrier to ensure state changes happen in a deterministic sequence. I mentioned this idea in our previous meeting, but I just want to bring it up again for discussion: (feel free to strike down this idea) Is it
[GitHub] [kafka] philipnee commented on pull request #13265: Prototype consumer stubs
philipnee commented on PR #13265: URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440786040 I feel the real challenge here is to determine the timing to synchronize the background and client state. I feel we should do that deterministically otherwise, it is hard to provide a contract to the user. Anyways here are a few of my notes: 1. What should the user expect: I think these are a few scenarios - `subscribe()` -> `subscriptions()` - `assign()` -> `assignments()` - `seek()` -> `position()` - `seek()` -> poll() (something returned) -> `position()` 2. Do we all agree to only drain the queue in `poll()` ? There are a few things to consider - When and where to throw the CommitFailureException() (I think now it can happen everywhere when user invoke APIs) - When and where to invoke the callbacks (commit and rebalance)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1114930390 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { Review Comment: Afterall, I think it is a worse user experience if the assignment call cannot yield deterministic results. Even though we will need a KIP for this behavior change, I think it kind of provides the guarantee to the user. -- This is an automated message from the
[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
ijuma commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440767080 @dejan2609 I added a few cleanups here: https://github.com/ijuma/kafka/commit/9c6ae575276e673f39d188d42a332f9f0b07d2d0 If you agree with them, please integrate into your PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { Review Comment: I feel we need an explicit synchronization barrier to ensure state changes happen in a deterministic sequence. I mentioned this idea in our previous meeting, but I just want to bring it up again for discussion: (feel free to strike down this idea) Is it
[jira] [Updated] (KAFKA-14742) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs
[ https://issues.apache.org/jira/browse/KAFKA-14742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14742: Labels: flaky-test (was: ) > Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs > - > > Key: KAFKA-14742 > URL: https://issues.apache.org/jira/browse/KAFKA-14742 > Project: Kafka > Issue Type: Improvement >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Labels: flaky-test > > The ExactlyOnceSourceIntegrationTest appears to occasionally throw the > following exception in my local test runs: > {noformat} > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.base/java.util.HashMap.newNode(HashMap.java:1901) > at java.base/java.util.HashMap.putVal(HashMap.java:629) > at java.base/java.util.HashMap.put(HashMap.java:610) > at java.base/java.util.HashSet.add(HashSet.java:221) > at > java.base/java.util.stream.Collectors$$Lambda$6/0x80011.accept(Unknown > Source) > at > java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.base/java.util.stream.LongPipeline$1$1.accept(LongPipeline.java:177) > at > java.base/java.util.stream.Streams$RangeLongSpliterator.forEachRemaining(Streams.java:228) > at > java.base/java.util.Spliterator$OfLong.forEachRemaining(Spliterator.java:775) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.lambda$assertSeqnos$9(ExactlyOnceSourceIntegrationTest.java:964) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$$Lambda$2500/0x0008015a1908.accept(Unknown > Source) > at java.base/java.util.HashMap.forEach(HashMap.java:1421) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertSeqnos(ExactlyOnceSourceIntegrationTest.java:961) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertExactlyOnceSeqnos(ExactlyOnceSourceIntegrationTest.java:939) > at > org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testIntervalBoundary(ExactlyOnceSourceIntegrationTest.java:358) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100){noformat} > It appears that the data produced by the connectors under test is too large > to be asserted on with the current assertions' memory overhead. We should try > to optimize the assertions' overhead and or reduce the number of records > being asserted on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs
gharris1727 opened a new pull request, #13291: URL: https://github.com/apache/kafka/pull/13291 On my local machine, testIntervalBoundary is asserting on nearly 2.5 million records, when it appears that the test is written to need only 100-1000 records to perform assertions. This causes OOMEs in the test assertions which iterate over the set of records and perform memory allocations. I looked into reducing the assertion's memory overhead, but it didn't seem practical as even the smallest allocations appeared to exceed the memory limit. Instead, I configured the pre-existing throttle mechanism inside the MonitorableSourceConnector, so that tests now seem to produce ~90k records on my machine, leaving adequate spare memory for the existing assertions to pass without issue. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14742) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs
Greg Harris created KAFKA-14742: --- Summary: Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs Key: KAFKA-14742 URL: https://issues.apache.org/jira/browse/KAFKA-14742 Project: Kafka Issue Type: Improvement Reporter: Greg Harris Assignee: Greg Harris The ExactlyOnceSourceIntegrationTest appears to occasionally throw the following exception in my local test runs: {noformat} java.lang.OutOfMemoryError: GC overhead limit exceeded at java.base/java.util.HashMap.newNode(HashMap.java:1901) at java.base/java.util.HashMap.putVal(HashMap.java:629) at java.base/java.util.HashMap.put(HashMap.java:610) at java.base/java.util.HashSet.add(HashSet.java:221) at java.base/java.util.stream.Collectors$$Lambda$6/0x80011.accept(Unknown Source) at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.base/java.util.stream.LongPipeline$1$1.accept(LongPipeline.java:177) at java.base/java.util.stream.Streams$RangeLongSpliterator.forEachRemaining(Streams.java:228) at java.base/java.util.Spliterator$OfLong.forEachRemaining(Spliterator.java:775) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.lambda$assertSeqnos$9(ExactlyOnceSourceIntegrationTest.java:964) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$$Lambda$2500/0x0008015a1908.accept(Unknown Source) at java.base/java.util.HashMap.forEach(HashMap.java:1421) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertSeqnos(ExactlyOnceSourceIntegrationTest.java:961) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertExactlyOnceSeqnos(ExactlyOnceSourceIntegrationTest.java:939) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testIntervalBoundary(ExactlyOnceSourceIntegrationTest.java:358) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100){noformat} It appears that the data produced by the connectors under test is too large to be asserted on with the current assertions' memory overhead. We should try to optimize the assertions' overhead and or reduce the number of records being asserted on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] chia7712 opened a new pull request, #13290: MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch
chia7712 opened a new pull request, #13290: URL: https://github.com/apache/kafka/pull/13290 the leader is changed from -1 to 1. The test get failed if it observes the -1 first. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs
philipnee commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1114851962 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { +return Collections.unmodifiableSet(subscriptions.assignedPartitions()); +} + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set subscription() { +
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1114835088 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link Serde}s + * to convert from K,ValueAndTimestampV to Bytes,byte[]. In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. Review Comment: Thanks for the suggestions! Incorporated this into the latest commit. I used `KeyValueStore` in the comments instead of `TimestampedKeyValueStore` (pending our other discussion about whether `MeteredVersionedKeyValueStore` is conceptually `MeteredTimestampedKeyValueStore` or `MeteredKeyValueStore`) and also modified the last line since I wasn't sure which `get()` override you were referring to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #12957: MINOR: Fix flaky testClientDisconnectionUpdatesRequestMetrics() (#11987)
jolshan merged PR #12957: URL: https://github.com/apache/kafka/pull/12957 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1114799670 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link Serde}s + * to convert from K,ValueAndTimestampV to Bytes,byte[]. In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( +key, +// versioned stores require a
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs
guozhangwang commented on code in PR #13265: URL: https://github.com/apache/kafka/pull/13265#discussion_r1114721314 ## clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java: ## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent; +import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent; +import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent; +import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent; +import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a wireframe or + * sketch, showing the interaction between the foreground and background threads. Each of the main API methods + * will need to answer the following questions: + * + * + * Does this method block? + * Does this method interact with the background thread? + * If yes, what data is passed as input to the background thread? + * If yes, what data is returned as output from the background thread? + * + * + * @param Key + * @param Value + * @see ApplicationEventProcessor for the logic of the background event handler + */ +public class StubbedAsyncKafkaConsumer implements Consumer { + +/** + * These instance variables are intentionally left unassigned, to avoid clutter... + */ +private Time time; + +private EventHandler eventHandler; + +private SubscriptionState subscriptions; + +private Deserializer keyDeserializer; + +private Deserializer valueDeserializer; + +private long defaultApiTimeoutMs; + +private List assignors; + +private Optional groupId; + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set assignment() { +return Collections.unmodifiableSet(subscriptions.assignedPartitions()); +} + +/** + * Answers to the above questions: + * + * + * No + * No + * n/a + * n/a + * + */ +@Override +public Set subscription() {
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link Serde}s + * to convert from K,ValueAndTimestampV to Bytes,byte[]. In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( Review Comment: Let me try to repeat back your suggestion to make
[GitHub] [kafka] philipnee opened a new pull request, #13289: [TESTING] testing failing test from another PR
philipnee opened a new pull request, #13289: URL: https://github.com/apache/kafka/pull/13289 Don't review this PR! I'm just testing why a bunch of irrelevant tests are failing on the other branch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1114727896 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link Serde}s + * to convert from K,ValueAndTimestampV to Bytes,byte[]. In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { Review Comment: `MeteredVersionedKeyValueStoreInternal` and `MeteredTimestampKeyValueStore` use different serdes -- `MeteredTimestampKeyValueStore` uses `ValueAndTimestampSerde` while `MeteredVersionedKeyValueStoreInternal` uses `NullableValueAndTimestampSerde`. If you look at the code for `MeteredTimestampKeyValueStore`, the only method it overrides from `MeteredKeyValueStore` is `prepareValueSerdeForStore()`. We could have `MeteredVersionedKeyValueStoreInternal` extend `MeteredTimestampKeyValueStore` instead of `MeteredKeyValueStore` if we want but `MeteredVersionedKeyValueStoreInternal` needs to override `prepareValueSerdeForStore()` with its own serde anyway, so
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440514286 Hmm, strangely, this branch seems to trigger a bunch of initializing error failures. And I can't seem to reproduce them locally... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14714) Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
[ https://issues.apache.org/jira/browse/KAFKA-14714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692294#comment-17692294 ] Satish Duggana commented on KAFKA-14714: https://github.com/apache/kafka/pull/13255 > Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module. > - > > Key: KAFKA-14714 > URL: https://issues.apache.org/jira/browse/KAFKA-14714 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd merged pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd merged PR #13255: URL: https://github.com/apache/kafka/pull/13255 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440489302 Yeah I think it's okay to make the rule consistent, i.e. to honor the timeout even under those four exceptions: if the timer has elapsed, then we should well return from the loop in ``` client.poll(future, timer); if (!future.isDone()) { // we ran out of time return false; } ``` even if the response yet to be returned would contain any of these four exceptions. So I think we should still obey this rule, i.e. even if a response has been returned and we know it's going to be one of these four exceptions, if the timer has elapsed, we still exit the loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1114692109 ## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ## @@ -38,15 +40,21 @@ */ public final class TopicsImage { public static final TopicsImage EMPTY = -new TopicsImage(Collections.emptyMap(), Collections.emptyMap()); +new TopicsImage(map(), map()); + +final ImMap topicsById; +final ImMap topicsByName; -private final Map topicsById; -private final Map topicsByName; +public TopicsImage(ImMap topicsById, + ImMap topicsByName) { +this.topicsById = topicsById; +this.topicsByName = topicsByName; +} -public TopicsImage(Map topicsById, - Map topicsByName) { -this.topicsById = Collections.unmodifiableMap(topicsById); -this.topicsByName = Collections.unmodifiableMap(topicsByName); +public TopicsImage including(TopicImage topic) { Review Comment: It is used from a test class in the `core` module so it needs to be public :-( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1114679640 ## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ## @@ -76,8 +84,8 @@ public TopicImage getTopic(String name) { } public void write(ImageWriter writer, ImageWriterOptions options) { -for (TopicImage topicImage : topicsById.values()) { -topicImage.write(writer, options); +for (Map.Entry entry : topicsById.entrySet()) { +entry.getValue().write(writer, options); Review Comment: We could, but it is marked deprecated in the library because there is no way to provide a reasonable `.equals()` method. I actually checked, and indeed it is true: ``` @Test public void testMapValuesEquality() { Map m = new HashMap<>(); m.put("a", "a"); m.put("b", "b"); assertEquals(m.keySet(), new HashSet<>(Arrays.asList("a", "b"))); assertEquals(m.keySet(), new HashSet<>(Arrays.asList("b", "a"))); // note that these all assert inequality assertNotEquals(m.values(), new HashSet<>(Arrays.asList("a", "b"))); assertNotEquals(m.values(), Arrays.asList("a", "b")); assertNotEquals(m.values(), new HashSet<>(Arrays.asList("b", "a"))); assertNotEquals(m.values(), Arrays.asList("b", "a")); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1114673066 ## metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java: ## @@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) { } public TopicsImage apply() { -Map newTopicsById = new HashMap<>(image.topicsById().size()); -Map newTopicsByName = new HashMap<>(image.topicsByName().size()); -for (Entry entry : image.topicsById().entrySet()) { -Uuid id = entry.getKey(); -TopicImage prevTopicImage = entry.getValue(); -TopicDelta delta = changedTopics.get(id); -if (delta == null) { -if (!deletedTopicIds.contains(id)) { -newTopicsById.put(id, prevTopicImage); -newTopicsByName.put(prevTopicImage.name(), prevTopicImage); -} +ImMap newTopicsById = image.topicsById; +ImMap newTopicsByName = image.topicsByName; +// apply all the deletes +for (Uuid topicId: deletedTopicIds) { +// it was deleted, so we have to remove it from the maps +TopicImage originalTopicToBeDeleted = image.topicsById.get(topicId); +if (originalTopicToBeDeleted == null) { +throw new IllegalStateException("Missing topic id " + topicId); } else { -TopicImage newTopicImage = delta.apply(); -newTopicsById.put(id, newTopicImage); -newTopicsByName.put(delta.name(), newTopicImage); +newTopicsById = newTopicsById.without(topicId); +newTopicsByName = newTopicsByName.without(originalTopicToBeDeleted.name()); } } -for (Entry entry : changedTopics.entrySet()) { -if (!newTopicsById.containsKey(entry.getKey())) { -TopicImage newTopicImage = entry.getValue().apply(); -newTopicsById.put(newTopicImage.id(), newTopicImage); -newTopicsByName.put(newTopicImage.name(), newTopicImage); -} +// apply all the updates/additions +for (Map.Entry entry: changedTopics.entrySet()) { +Uuid topicId = entry.getKey(); +TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply(); +// put new information into the maps +String topicName = newTopicToBeAddedOrUpdated.name(); +newTopicsById = newTopicsById.assoc(topicId, newTopicToBeAddedOrUpdated); +newTopicsByName = newTopicsByName.assoc(topicName, newTopicToBeAddedOrUpdated); Review Comment: Persistent data structures are immutable, so they always create and return a new data structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1
guozhangwang commented on code in PR #13228: URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660400 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -151,9 +202,18 @@ private void resumeTasks() { } } -private void restoreTasks() { +private void pauseTasks() { +for (final Task task : updatingTasks.values()) { Review Comment: I think the perf impact should be small since pause/resume are not commonly used, and if the named topology are not paused, then checking the status is just a few cpu cycles. Another motivation is that if we remove named topologies, than pausing / resuming would always be impact on all tasks, in which case we could have a simpler check (e.g. just check a single flag) which would be even cheaper. So it's probably better to maintain the code layout in this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13212: MINOR: Remove accidental unnecessary code; fix comment references
clolov commented on PR #13212: URL: https://github.com/apache/kafka/pull/13212#issuecomment-1440432643 This has been rebased on the latest trunk, test have been ran locally and I would be very grateful for a review when you get the time @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request, #13288: MINOR: fix rerun-tests for unit test
chia7712 opened a new pull request, #13288: URL: https://github.com/apache/kafka/pull/13288 related to #11926. we don't process `rerun-tests` for unit test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1
guozhangwang commented on code in PR #13228: URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660189 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -598,6 +685,12 @@ public Set getUpdatingStandbyTasks() { : Collections.emptySet(); } +public Set getUpdatingActiveTasks() { +return stateUpdaterThread != null Review Comment: That's a good point, will update. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -151,9 +202,18 @@ private void resumeTasks() { } } -private void restoreTasks() { +private void pauseTasks() { +for (final Task task : updatingTasks.values()) { Review Comment: I think the perf impact should be small since pause/resume are not commonly used, and if the named topology are not paused, then checking the status is just a few cpu cycles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a diff in pull request #11926: KAFKA-13714: Fix cache flush position
chia7712 commented on code in PR #11926: URL: https://github.com/apache/kafka/pull/11926#discussion_r1114653978 ## build.gradle: ## @@ -435,6 +435,12 @@ subprojects { maxRetries = userMaxTestRetries maxFailures = userMaxTestRetryFailures } + +// Allows devs to run tests in a loop to debug flaky tests +// Eg: I=0; while ./gradlew :streams:test -Prerun-tests --tests org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do (( I=$I+1 )); echo +if (project.hasProperty("rerun-tests")) { + outputs.upToDateWhen { false } +} Review Comment: > which causes us to recompile the tests on every iteration. just curious. on my local, `cleanTest` does not invoke recompile. The following console output is produced by command `./gradlew cleanTest clients:test --tests RequestResponseTest --info` ``` > Task :clients:compileTestJava UP-TO-DATE Caching disabled for task ':clients:compileTestJava' because: Build cache is disabled Skipping task ':clients:compileTestJava' as it is up-to-date. Resolve mutations for :clients:testClasses (Thread[Execution worker Thread 6,5,main]) started. :clients:testClasses (Thread[Execution worker Thread 6,5,main]) started. > Task :clients:testClasses UP-TO-DATE Skipping task ':clients:testClasses' as it has no actions. Resolve mutations for :clients:checkstyleTest (Thread[Execution worker Thread 6,5,main]) started. :clients:checkstyleTest (Thread[Execution worker Thread 6,5,main]) started. ``` Could you share the command which causes the recompile to me? thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on PR #13255: URL: https://github.com/apache/kafka/pull/13255#issuecomment-1440419578 A couple of test failures are not related to this change, will merge the changes to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13259: MINOR: Simplify JUnit assertions in src; remove accidental unnecessary code in src
mimaison commented on PR #13259: URL: https://github.com/apache/kafka/pull/13259#issuecomment-1440413003 I checked the clients and connect changes and they look good. I think some of the streams changes could be debatable (not sure if they want to keep some of the comments) so I'll let someone working on streams review them. @ableegoldman @guozhangwang can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14740) Missing source tag on MirrorSource metrics
[ https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-14740: -- Assignee: Mickael Maison > Missing source tag on MirrorSource metrics > -- > > Key: KAFKA-14740 > URL: https://issues.apache.org/jira/browse/KAFKA-14740 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > The metrics defined in MirrorSourceMetrics have the following tags "target", > "topic", "partition". It would be good to also have a "source" tag with the > source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14741) Add description field to connector configs
Mickael Maison created KAFKA-14741: -- Summary: Add description field to connector configs Key: KAFKA-14741 URL: https://issues.apache.org/jira/browse/KAFKA-14741 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Mickael Maison Assignee: Mickael Maison Connectors are identified by their name. In many cases it would be useful to attach a description/comment to connectors to provide some context. This would be especially useful on Connect clusters running several connectors and/or shared by multiple teams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1114586256 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: @viktorsomogyi sure! The feature is already implemented and unit tests added, just not committed yet as I am not 100% sure about the global config integration (static Strings for the properties, where to reference, additional interfaces to use...). The other providers example do not have any config methods implemented. Happy to learn how to include that if there are any references / best practices you can share. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics
[ https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692262#comment-17692262 ] Mickael Maison commented on KAFKA-14740: Thanks for the quick reply. I think it would make dealing metrics a little bit easier when you have a bunch of mirroring routes between multiple clusters. I'll draft a small KIP. > Missing source tag on MirrorSource metrics > -- > > Key: KAFKA-14740 > URL: https://issues.apache.org/jira/browse/KAFKA-14740 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Priority: Major > > The metrics defined in MirrorSourceMetrics have the following tags "target", > "topic", "partition". It would be good to also have a "source" tag with the > source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13259: MINOR: Simplify JUnit assertions in src; remove accidental unnecessary code in src
clolov commented on PR #13259: URL: https://github.com/apache/kafka/pull/13259#issuecomment-1440295776 Hello @mimaison, would you have the time to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics
[ https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692252#comment-17692252 ] Ryanne Dolan commented on KAFKA-14740: -- [~mimaison] the topic name usually includes the source cluster already, so I figured it was redundant. With identity replication you don't get that, but you presumably know what the source is in such cases. I don't have any objections to adding it tho. > Missing source tag on MirrorSource metrics > -- > > Key: KAFKA-14740 > URL: https://issues.apache.org/jira/browse/KAFKA-14740 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Priority: Major > > The metrics defined in MirrorSourceMetrics have the following tags "target", > "topic", "partition". It would be good to also have a "source" tag with the > source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics
[ https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692248#comment-17692248 ] Mickael Maison commented on KAFKA-14740: [~ryannedolan] Do you remember if there was a reason not to include the source tag on these metrics? MirrorCheckpointMetrics have the source tag. > Missing source tag on MirrorSource metrics > -- > > Key: KAFKA-14740 > URL: https://issues.apache.org/jira/browse/KAFKA-14740 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Mickael Maison >Priority: Major > > The metrics defined in MirrorSourceMetrics have the following tags "target", > "topic", "partition". It would be good to also have a "source" tag with the > source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14740) Missing source tag on MirrorSource metrics
Mickael Maison created KAFKA-14740: -- Summary: Missing source tag on MirrorSource metrics Key: KAFKA-14740 URL: https://issues.apache.org/jira/browse/KAFKA-14740 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Mickael Maison The metrics defined in MirrorSourceMetrics have the following tags "target", "topic", "partition". It would be good to also have a "source" tag with the source cluster alias. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on PR #13280: URL: https://github.com/apache/kafka/pull/13280#issuecomment-1440262838 @showuon Thanks for the review. Yes, I noticed the lack of Java 8 support as well. I tried compiling the latest version of Paguro with Java 8 and it did not work (e.g. `[ERROR] reason: '<>' with anonymous inner classes is not supported in -source 8`). I will change the PRs to use the Java-8 compatible Paguro 3.1.2 version instead of Paguro 3.10.3 for the moment while doing research for a replacement library that supports Java 8 and that extends the standard `java.util` interfaces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14738) Topic disappears from kafka_topic.sh --list after modifying it with kafka_acl.sh
[ https://issues.apache.org/jira/browse/KAFKA-14738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692235#comment-17692235 ] Gabriel Lukacs commented on KAFKA-14738: ok, thanks for clarification, my fault, i was not familiar with acl/jaas, but now it is clear. sorry for inconveniences, pls close this bug. > Topic disappears from kafka_topic.sh --list after modifying it with > kafka_acl.sh > > > Key: KAFKA-14738 > URL: https://issues.apache.org/jira/browse/KAFKA-14738 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.2.3 >Reporter: Gabriel Lukacs >Priority: Major > > Topic is not listed via kafka-topics.sh --list after modifying it with > kafka-acls.sh (-add --allow-principal User:CN=test --operation Read): > $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 > --topic test2 --replication-factor 1 --partitions 50 > Created topic test2. > $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic > test2 > test2 > $ /opt/kafka/bin/kafka-acls.sh --bootstrap-server kafka:9092 --topic test2 > --add --allow-principal User:CN=test --operation Read > Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, > patternType=LITERAL)`: > (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW) > Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, > patternType=LITERAL)`: > (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW) > $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic > test2 > $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 > --topic test2 > Error while executing topic command : Topic 'test2' already exists. > [2023-02-21 16:37:39,185] ERROR > org.apache.kafka.common.errors.TopicExistsException: Topic 'test2' already > exists. > (kafka.admin.TopicCommand$) > $ /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka:9092 > --topic test2 > Error while executing topic command : Topic 'test2' does not exist as expected > [2023-02-21 16:37:49,485] ERROR java.lang.IllegalArgumentException: Topic > 'test2' does not exist as expected > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401) > at > kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:63) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > $ /opt/kafka/bin/kafka-topics.sh --version > 3.2.3 (Commit:50029d3ed8ba576f) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
viktorsomogyi commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1114431334 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: @Schm1tz1 did you manage to look at @OneCricketeer 's comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1
lucasbru commented on code in PR #13228: URL: https://github.com/apache/kafka/pull/13228#discussion_r1114354933 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -151,9 +202,18 @@ private void resumeTasks() { } } -private void restoreTasks() { +private void pauseTasks() { +for (final Task task : updatingTasks.values()) { Review Comment: Not sure, but is there any performance concern around running this loop in every single iteration of `runOnce` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -598,6 +685,12 @@ public Set getUpdatingStandbyTasks() { : Collections.emptySet(); } +public Set getUpdatingActiveTasks() { +return stateUpdaterThread != null Review Comment: As I understand it, this function will be called quite frequently to export metrics. We only need the size of the collection. It could make sense to avoid the allocations here and just implement a `getNumberOfUpdaingActiveTasks` as a non-essential but free optimization. Similar for `getPausedStandbyTasks` etc. pp. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -399,31 +459,56 @@ private void addToRestoredTasks(final StreamTask task) { } } -private void checkAllUpdatingTaskStates(final long now) { +private void maybeCheckpointTasks(final long now) { final long elapsedMsSinceLastCommit = now - lastCommitMs; if (elapsedMsSinceLastCommit > commitIntervalMs) { if (log.isDebugEnabled()) { log.debug("Checking all restoring task states since {}ms has elapsed (commit interval is {}ms)", Review Comment: Update the log message as well. This function isn't really checking task states anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge closed pull request #5545: MINOR: Fixed couple of warnings
tinaselenge closed pull request #5545: MINOR: Fixed couple of warnings URL: https://github.com/apache/kafka/pull/5545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on pull request #5545: MINOR: Fixed couple of warnings
tinaselenge commented on PR #5545: URL: https://github.com/apache/kafka/pull/5545#issuecomment-1439953834 This tool is being migrated as part of https://issues.apache.org/jira/browse/KAFKA-14525. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration
cadonna commented on code in PR #13269: URL: https://github.com/apache/kafka/pull/13269#discussion_r1114238733 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ## @@ -255,6 +255,7 @@ public void completeRestoration(final java.util.function.Consumerhttps://github.com/apache/kafka/blob/98c2f88e1c605195ccfac19c49a83216e26146a1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L492 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13216: Remove unused ZooKeeper log level configuration from `connect-log4j.properties`
mimaison merged PR #13216: URL: https://github.com/apache/kafka/pull/13216 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14304) ZooKeeper to KRaft Migration
[ https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692106#comment-17692106 ] Luke Chen commented on KAFKA-14304: --- [~mumrah] , also, the fixed version of this epic is set to v3.4.1, is that correct? Shouldn't it be v3.5.0? cc 3.5 release manager [~mimaison] > ZooKeeper to KRaft Migration > > > Key: KAFKA-14304 > URL: https://issues.apache.org/jira/browse/KAFKA-14304 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.4.1 > > > Top-level JIRA for > [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
showuon commented on PR #13280: URL: https://github.com/apache/kafka/pull/13280#issuecomment-1439780222 @rondagostino , also, it looks like this library doesn't support JDK 8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
[ https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RivenSun reassigned KAFKA-13771: Assignee: RivenSun > Support to explicitly delete delegationTokens that have expired but have not > been automatically cleaned up > -- > > Key: KAFKA-13771 > URL: https://issues.apache.org/jira/browse/KAFKA-13771 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > > Quoting the official documentation > {quote} > Tokens can also be cancelled explicitly. If a token is not renewed by the > token’s expiration time or if token is beyond the max life time, it will be > deleted from all broker caches as well as from zookeeper. > {quote} > 1. The first point above means that after the `AdminClient` initiates the > EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() > method on the KafkaServer side, if the user passes in expireLifeTimeMs less > than 0, KafaServer will delete the corresponding delegationToken directly. > 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, > which is responsible for regularly cleaning up expired tokens. The execution > interval is `delegation.token.expiry.check.interval.ms`, and the default > value is one hour. > But carefully analyze the code logic in DelegationTokenManager.expireToken(), > *now Kafka does not support users to delete an expired delegationToken that > he no longer uses/renew. If the user wants to do this, they will receive a > DelegationTokenExpiredException.* > In the worst case, an expired delegationToken may still can be used normally > within {*}an hour{*}, even if this configuration > (delegation.token.expiry.check.interval.ms) broker can shorten the > configuration as much as possible. > The solution is very simple, simply adjust the `if` order of > DelegationTokenManager.expireToken(). > {code:java} > if (!allowedToRenew(principal, tokenInfo)) { > expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1) > } else if (expireLifeTimeMs < 0) { //expire immediately > removeToken(tokenInfo.tokenId) > info(s"Token expired for token: ${tokenInfo.tokenId} for owner: > ${tokenInfo.owner}") > expireResponseCallback(Errors.NONE, now) > } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { > expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) > } else { > //set expiry time stamp > .. > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bachmanity1 commented on a diff in pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size
bachmanity1 commented on code in PR #13261: URL: https://github.com/apache/kafka/pull/13261#discussion_r1114028635 ## clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java: ## @@ -688,8 +688,10 @@ public Object read(ByteBuffer buffer) { if (size > buffer.remaining()) throw new SchemaException("Error reading bytes of size " + size + ", only " + buffer.remaining() + " bytes available"); +int limit = buffer.limit(); +buffer.limit(buffer.position() + size); Review Comment: I introduced a new variable `newPosition` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org