[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface
dengziming commented on code in PR #13826: URL: https://github.com/apache/kafka/pull/13826#discussion_r1223802381 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -584,4 +584,12 @@ class ZkMetadataCache( def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = { featuresAndEpoch } + + override def versionContext(): MetadataVersionContext = { Review Comment: The name is confusing, we include features and metadata version in the context, it's better to use FeaturesContext since metadata version is also a feature. ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -447,16 +447,9 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)) CompletableFuture.completedFuture[Unit](()) } else { - val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.finalizedFeatures(context).handle { (result, exception) => -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - if (exception != null) { -apiVersionRequest.getErrorResponse(requestThrottleMs, exception) - } else { -apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch()) - } -}) - } + requestHelper.sendResponseMaybeThrottle(request, Review Comment: Since we are changing back to the original code, we can make it the same as KafkaApis, like this: ``` def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = { val apiVersionRequest = request.body[ApiVersionsRequest] if (apiVersionRequest.hasUnsupportedRequestVersion) { apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception) } else if (!apiVersionRequest.isValid) { apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception) } else { apiVersionManager.apiVersionResponse(requestThrottleMs) } } requestHelper.sendResponseMaybeThrottle(request, createResponseCallback) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13823: MINOR: Move MockTime to server-common
dajac merged PR #13823: URL: https://github.com/apache/kafka/pull/13823 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13823: MINOR: Move MockTime to server-common
dajac commented on PR #13823: URL: https://github.com/apache/kafka/pull/13823#issuecomment-1584066362 I will merge it as it is based on @mumrah's +1. I will follow up based on @divijvaidya's replies if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
dajac commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1223913820 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { Review Comment: Hum... Implementing AutoCloseable alone does not help. We would need to change all the usages to call `close` to make it work, right? My goal with this patch is to move it from core to server-common without changing it. I think that we could consider doing this later, if it brings anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
dajac commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1223914389 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { Review Comment: I agree that the name is bad. As said earlier, my intent is to just move it for now. We could consider renaming it separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
dajac commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1223909537 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { +public final MockScheduler scheduler; - val scheduler = new MockScheduler(this) +public MockTime() { +this(System.currentTimeMillis(), System.nanoTime()); Review Comment: I don't see any benefits of using it here. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
dajac commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1223906254 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { +public final MockScheduler scheduler; Review Comment: It would break a bunch of java usages. This is why I kept it like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] danielgospodinow commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale
danielgospodinow commented on code in PR #13827: URL: https://github.com/apache/kafka/pull/13827#discussion_r1223881556 ## .github/workflows/stale.yml: ## @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: 'Close stale issues and PRs' +on: + #schedule: Review Comment: Maybe we can add an additional comment here to explain that there's a plan to use this cron schedule for nightly running of the workflow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] KarboniteKream commented on pull request #13762: MINOR: Do not print an empty line when no topics exist
KarboniteKream commented on PR #13762: URL: https://github.com/apache/kafka/pull/13762#issuecomment-1583888214 Sure, I understand the concerns. > you might want to align your PR with that PR? Do you mean waiting for that PR to get merged, then rebase my changes? Or add my change to that PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
showuon merged PR #13807: URL: https://github.com/apache/kafka/pull/13807 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13830: KAFKA-14936: Change Time Ordered Buffer to not require Change<> 0/N
vcrfxia commented on code in PR #13830: URL: https://github.com/apache/kafka/pull/13830#discussion_r1223656443 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java: ## @@ -27,14 +26,14 @@ import java.util.function.Consumer; import java.util.function.Supplier; -public interface TimeOrderedKeyValueBuffer extends StateStore { +public interface TimeOrderedKeyValueBuffer extends StateStore { Review Comment: Can we add a quick javadoc to clarify that `T` here is the buffer type? Should already be clear to anyone who reads the code, but a javadoc will help make it so readers won't need to read the code if all they want to know is what the type is. ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java: ## @@ -65,7 +65,7 @@ import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3; import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2; -public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { +public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer> { Review Comment: Could be good to rename this to `InMemoryTimeOrderedKeyValueChangeBuffer` to contrast with `RocksDBTimeOrderedKeyValueBuffer` (not a change buffer) in your next PR, but I don't feel too strongly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
[ https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767 ] Bo Gao edited comment on KAFKA-15053 at 6/8/23 11:45 PM: - Hi [~ChrisEgerton] , just created a pull request for the fix [https://github.com/apache/kafka/pull/13831]. I also have a quick question for the release process: imagine this fix is merged and released, to consume this fix, do I need to upgrade to the latest Kafka version? Or is there a way to include this fix in a minor release? Thanks! was (Author: JIRAUSER300429): Hi [~ChrisEgerton] , just created a pull request for the fix [https://github.com/apache/kafka/pull/13831|https://github.com/apache/kafka/pull/13831.] I also have a quick question for the release process: imagine this fix is merged and released, to consume this fix, do I need to upgrade to the latest Kafka version? Or is there a way to include this fix in a minor release? Thanks! > Regression for security.protocol validation starting from 3.3.0 > --- > > Key: KAFKA-15053 > URL: https://issues.apache.org/jira/browse/KAFKA-15053 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Bo Gao >Priority: Major > > [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue > introduced validations on multiple configs. As a consequence, config > {{security.protocol}} now only allows upper case values such as PLAINTEXT, > SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like > sasl_ssl, ssl are also supported, there's even a case insensitive logic > inside > [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] > to handle the lower case values. > I think we should treat this as a regression bug since we don't support lower > case values anymore since 3.3.0. For versions later than 3.3.0, we are > getting error like this when using lower case value sasl_ssl > {{Invalid value sasl_ssl for configuration security.protocol: String must be > one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
[ https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767 ] Bo Gao edited comment on KAFKA-15053 at 6/8/23 11:44 PM: - Hi [~ChrisEgerton] , just created a pull request for the fix [https://github.com/apache/kafka/pull/13831|https://github.com/apache/kafka/pull/13831.] I also have a quick question for the release process: imagine this fix is merged and released, to consume this fix, do I need to upgrade to the latest Kafka version? Or is there a way to include this fix in a minor release? Thanks! was (Author: JIRAUSER300429): Hi [~ChrisEgerton] , just created a pull request for the fix [https://github.com/apache/kafka/pull/13831.] I also have a quick question for the release process: imagine this fix is merged and released, to consume this fix, do I need to upgrade to the latest Kafka version? Or is there a way to include this fix in a minor release? Thanks! > Regression for security.protocol validation starting from 3.3.0 > --- > > Key: KAFKA-15053 > URL: https://issues.apache.org/jira/browse/KAFKA-15053 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Bo Gao >Priority: Major > > [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue > introduced validations on multiple configs. As a consequence, config > {{security.protocol}} now only allows upper case values such as PLAINTEXT, > SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like > sasl_ssl, ssl are also supported, there's even a case insensitive logic > inside > [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] > to handle the lower case values. > I think we should treat this as a regression bug since we don't support lower > case values anymore since 3.3.0. For versions later than 3.3.0, we are > getting error like this when using lower case value sasl_ssl > {{Invalid value sasl_ssl for configuration security.protocol: String must be > one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
[ https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767 ] Bo Gao commented on KAFKA-15053: Hi [~ChrisEgerton] , just created a pull request for the fix [https://github.com/apache/kafka/pull/13831.] I also have a quick question for the release process: imagine this fix is merged and released, to consume this fix, do I need to upgrade to the latest Kafka version? Or is there a way to include this fix in a minor release? Thanks! > Regression for security.protocol validation starting from 3.3.0 > --- > > Key: KAFKA-15053 > URL: https://issues.apache.org/jira/browse/KAFKA-15053 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Bo Gao >Priority: Major > > [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue > introduced validations on multiple configs. As a consequence, config > {{security.protocol}} now only allows upper case values such as PLAINTEXT, > SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like > sasl_ssl, ssl are also supported, there's even a case insensitive logic > inside > [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] > to handle the lower case values. > I think we should treat this as a regression bug since we don't support lower > case values anymore since 3.3.0. For versions later than 3.3.0, we are > getting error like this when using lower case value sasl_ssl > {{Invalid value sasl_ssl for configuration security.protocol: String must be > one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bogao007 opened a new pull request, #13831: KAFKA-15053: Use case insensitive validator for security.protocol config
bogao007 opened a new pull request, #13831: URL: https://github.com/apache/kafka/pull/13831 Fixed a regression described in [KAFKA-15053](https://issues.apache.org/jira/browse/KAFKA-15053) that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl) Added new unit test to cover the case insensitive test case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
ijuma commented on code in PR #13679: URL: https://github.com/apache/kafka/pull/13679#discussion_r1223558292 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -112,4 +157,8 @@ class DefaultApiVersionManager( zkMigrationEnabled ) } + + override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = { +throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead") Review Comment: I think @cmccabe submitted a PR to fix this here https://github.com/apache/kafka/pull/13826 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14966) Extract reusable common logic from OffsetFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-14966. - Fix Version/s: 3.6.0 Resolution: Fixed Merged the PR to trunk. > Extract reusable common logic from OffsetFetcher > > > Key: KAFKA-14966 > URL: https://issues.apache.org/jira/browse/KAFKA-14966 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.6.0 > > > The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, > validate and reset positions. > For the new consumer based on a refactored threading model, similar > functionality will be needed by the ListOffsetsRequestManager component. > This task aims at identifying and extracting the OffsetFetcher functionality > that can be reused by the new consumer implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao merged pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic
junrao merged PR #13815: URL: https://github.com/apache/kafka/pull/13815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic
lianetm commented on PR #13815: URL: https://github.com/apache/kafka/pull/13815#issuecomment-1583285348 Thanks @junrao. Just addressed the comment and checked unrelated failing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0
bmscomp commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1583266979 @showuon Yes I'll do ,Thanks so much @divijvaidya for reviews, I am back from holidays :) , I'll continue working on this topic, I'll check all comments one by one and try to bring the best answer possible that I'lll be able to do Thanks again for all reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Booke reassigned KAFKA-14995: Assignee: Steven Booke > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461888 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); Review Comment: I'll take a closer look at this. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java: ## @@ -166,7 +208,7 @@ private DefaultBackgroundThread mockBackgroundThread() { applicationEventsQueue, backgroundEventsQueue, this.errorEventHandler, -processor, +applicationEventProcessor, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461556 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) { @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +// fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); Review Comment: Backed out the unsubscribe logic as it wasn't implemented in any meaningful way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461234 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); + +log.info("Assigned to partition(s): {}", Utils.join(partitions, ", ")); +if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) + updateMetadata(time.milliseconds()); Review Comment: Does the latest rename and refactor address your concerns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223460895 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher Review Comment: Yes. We don't have the Fetcher ready to merge into AK yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223460709 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent event) { manager.addOffsetFetchRequest(event.partitions); return true; } + +private boolean process(final MetadataUpdateApplicationEvent event) { +metadata.requestUpdateForNewTopics(); +return true; +} + +private boolean process(final UnsubscribeApplicationEvent event) { +/* +this.coordinator.onLeavePrepare(); +this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); + */ Review Comment: Removed those lines as it's for unsubscribe which isn't baked enough for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1223454138 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent event) { manager.addOffsetFetchRequest(event.partitions); return true; } + +private boolean process(final MetadataUpdateApplicationEvent event) { +metadata.requestUpdateForNewTopics(); Review Comment: Changed `MetadataUpdateApplicationEvent` to `NewTopicsMetadataUpdateRequestEvent` for clarity. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataUpdateApplicationEvent.java: ## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class MetadataUpdateApplicationEvent extends ApplicationEvent { + +private final long timestamp; Review Comment: Removed the unused variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale
jlprat commented on code in PR #13827: URL: https://github.com/apache/kafka/pull/13827#discussion_r1223425372 ## .github/workflows/stale.yml: ## @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: 'Close stale issues and PRs' +on: + #schedule: + # - cron: '30 3 * * *' + workflow_dispatch: +inputs: + dryRun: +description: 'Dry Run' +required: true +default: true +type: boolean + operationsPerRun: +description: 'Max GitHub API operations' +required: true +default: 30 +type: number + +permissions: + issues: write + pull-requests: write + +jobs: + stale: +runs-on: ubuntu-latest +steps: + - uses: actions/stale@v8 +with: + debug-only: ${{ inputs.dryRun }} + operations-per-run: ${{ inputs.operationsPerRun }} + days-before-stale: 90 Review Comment: I think 90 days is a long time but looking at our queue, I think it's a realistic value to start with. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale
jlprat commented on PR #13827: URL: https://github.com/apache/kafka/pull/13827#issuecomment-1583155790 > I also found https://github.com/marketplace/actions/auto-label-merge-conflicts which would add a label to PRs with conflicts. This could be used to create two separate workflows of the "stale" action. This is a good one, I think we can combine these 2 actions and make them run to accomplish what we need. Mark PRs with merge conflicts with a label, and then we can run the stale job twice, one excluding the conflict label and another one only for the conflict label. > Do you agree that we can scope this PR to "marking old PRs as stale" for the sake of clearing our backlog? Yes this makes sense. I'll work on the other one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730665#comment-17730665 ] Erik van Oosten commented on KAFKA-10337: - Thanks for your PR [~thomaslee]. It has been merged now with little changes. > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Erik van Oosten >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
ahuang98 commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1223387331 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java: ## @@ -195,9 +235,658 @@ public void iterateTopics(EnumSet interests, TopicVisitor (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); writer.handleSnapshot(image, consumer); assertEquals(1, opCounts.remove("CreateTopic")); -assertEquals(1, opCounts.remove("UpdatePartition")); +assertEquals(1, opCounts.remove("UpdatePartitions")); assertEquals(1, opCounts.remove("UpdateTopic")); assertEquals(0, opCounts.size()); assertEquals("bar", topicClient.createdTopics.get(0)); } + +@Test +public void testDeleteTopicFromSnapshot() { +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap()); +} +}; +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); + +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +Map opCounts = new HashMap<>(); +KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, +(logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); +writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); + +opCounts.clear(); +topicClient.reset(); +writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(2, opCounts.remove("CreateTopic")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); +assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics); +} + +@FunctionalInterface +interface TopicVerifier { +void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer); +} + +void setupTopicWithTwoPartitions(TopicVerifier verifier) { +// Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier +Uuid topicId = Uuid.randomUuid(); +Map partitionMap = new HashMap<>(); +partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1)); +partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1)); + +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +Map> assignments = new HashMap<>(); +assignments.put(0, Arrays.asList(2, 3, 4)); +assignments.put(1, Arrays.asList(3, 4, 5)); +visitor.visitTopic("spam", topicId, assignments); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0)); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1)); +} +}; + +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); +delta.replay(new TopicRecord().setTopicId(topicId).setName("spam")); +delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message()); +delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message()); +TopicsImage image = delta.apply(); + +verifier.verify(topicId, image, topicClient, writer); +} + +@Test +public void testUpdatePartitionsFromSnapshot() { +setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { +
[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on PR #13782: URL: https://github.com/apache/kafka/pull/13782#issuecomment-1583094847 Open/close changes provided. Also corrected style check issue (in task ':storage:checkstyleMain'). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on code in PR #13782: URL: https://github.com/apache/kafka/pull/13782#discussion_r1223374923 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -681,7 +687,12 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); -fileChannel.force(true); +} + +if (scheduler != null) { +scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot")); +} else { +Utils.flushFileQuietly(file.toPath(), "producer-snapshot"); Review Comment: Open/close changes done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on code in PR #13782: URL: https://github.com/apache/kafka/pull/13782#discussion_r1223373702 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -430,11 +428,19 @@ public Optional lastEntry(long producerId) { * Take a snapshot at the current end offset if one does not already exist. */ public void takeSnapshot() throws IOException { +takeSnapshot(null); +} + +/** + * Take a snapshot at the current end offset if one does not already exist. + * Flush the snapshot asynchronously if scheduler != null + */ +public void takeSnapshot(Scheduler scheduler) throws IOException { Review Comment: IOException still will be thrown on open/write/close operations. Force (flush) operation running by scheduler in a separate thread will write log warning only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1223326402 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { Review Comment: The segments are not particularly critical. What I needed was the time ordered part and that had been implemented with the segment store ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private FullChangeSerde valueSerde; +private final String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.M
[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
junrao commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1222183119 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -945,4 +1176,27 @@ public void close() { } } +private static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +this.retentionSize = retentionSize; +this.remainingBreachedSize = remainingBreachedSize; +} + +} + +private static class RetentionTimeData { + +private final long retentionMs; +private final long cleanupUntilMs; + +public RetentionTimeData(long retentionMs, long cleanupUntilMs) { +this.retentionMs = retentionMs; +this.cleanupUntilMs = cleanupUntilMs; +} + Review Comment: extra new line ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -945,4 +1176,27 @@ public void close() { } } +private static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +this.retentionSize = retentionSize; +this.remainingBreachedSize = remainingBreachedSize; +} + Review Comment: extra new line ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -945,4 +1176,27 @@ public void close() { } } +private static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +this.retentionSize = retentionSize; +this.remainingBreachedSize = remainingBreachedSize; +} + +} + +private static class RetentionTimeData { + +private final long retentionMs; +private final long cleanupUntilMs; + +public RetentionTimeData(long retentionMs, long cleanupUntilMs) { +this.retentionMs = retentionMs; +this.cleanupUntilMs = cleanupUntilMs; +} + +} + Review Comment: extra new line ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (retentionSizeData.get().remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!rete
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1223321590 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: @vcrfxia @cadonna I made the changes on https://github.com/apache/kafka/pull/13830. I will rebase this one once that is merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request, #13830: KAFKA-14936: Change Time Ordered Buffer to not require Change<> 0/N
wcarlson5 opened a new pull request, #13830: URL: https://github.com/apache/kafka/pull/13830 Make it so the Time ordered buffer doesn't need a change record. We now have two types for the value. One for the storage type and another for the type. They can be the same if you don't want to use the change value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi commented on PR #13696: URL: https://github.com/apache/kafka/pull/13696#issuecomment-1582997213 This minor pr is also about replica fetcher thread, please help to review, thanks! @viktorsomogyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
jolshan commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1223292555 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. + // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on + // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append. + // There are two phases -- the first append to the log and subsequent appends. + // + // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and + // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction + // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could + // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker, + // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not + // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2. + // + // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the + // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still + // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and + // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. + if (!hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard)) Review Comment: I think it is safe since we are just checking the producer state entry (and if it is missing, we will return false) But I can place the explicit is transactional check first if it seems clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
hudeqi commented on PR #13719: URL: https://github.com/apache/kafka/pull/13719#issuecomment-1582994469 @viktorsomogyi hi, this minor pr is also about replica fetcher thread, please help to review, thanks! Seems @dajac have no time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage
mumrah commented on code in PR #13802: URL: https://github.com/apache/kafka/pull/13802#discussion_r1223265946 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java: ## @@ -195,9 +235,658 @@ public void iterateTopics(EnumSet interests, TopicVisitor (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); writer.handleSnapshot(image, consumer); assertEquals(1, opCounts.remove("CreateTopic")); -assertEquals(1, opCounts.remove("UpdatePartition")); +assertEquals(1, opCounts.remove("UpdatePartitions")); assertEquals(1, opCounts.remove("UpdateTopic")); assertEquals(0, opCounts.size()); assertEquals("bar", topicClient.createdTopics.get(0)); } + +@Test +public void testDeleteTopicFromSnapshot() { +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap()); +} +}; +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); + +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +Map opCounts = new HashMap<>(); +KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts, +(logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY)); +writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); + +opCounts.clear(); +topicClient.reset(); +writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer); +assertEquals(1, opCounts.remove("DeleteTopic")); +assertEquals(1, opCounts.remove("DeleteTopicConfig")); +assertEquals(2, opCounts.remove("CreateTopic")); +assertEquals(0, opCounts.size()); +assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics); +assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics); +} + +@FunctionalInterface +interface TopicVerifier { +void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer); +} + +void setupTopicWithTwoPartitions(TopicVerifier verifier) { +// Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier +Uuid topicId = Uuid.randomUuid(); +Map partitionMap = new HashMap<>(); +partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1)); +partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1)); + +CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() { +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { +Map> assignments = new HashMap<>(); +assignments.put(0, Arrays.asList(2, 3, 4)); +assignments.put(1, Arrays.asList(3, 4, 5)); +visitor.visitTopic("spam", topicId, assignments); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0)); +visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1)); +} +}; + +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() +.setBrokersInZk(0) +.setTopicMigrationClient(topicClient) +.build(); +KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient); + +TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); +delta.replay(new TopicRecord().setTopicId(topicId).setName("spam")); +delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message()); +delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message()); +TopicsImage image = delta.apply(); + +verifier.verify(topicId, image, topicClient, writer); +} + +@Test +public void testUpdatePartitionsFromSnapshot() { +setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> { +
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService
divijvaidya commented on code in PR #13812: URL: https://github.com/apache/kafka/pull/13812#discussion_r1223233148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidFetchSizeException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.message.OffsetDeleteRequestData; +import org.apache.kafka.common.message.OffsetDeleteResponseData; +import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier; +import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.util.FutureUtils; +import org.slf4j.Logger; + +import java.util.List; +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +/** + * The group coordinator service. + */ +public class GroupCoordinatorService implements GroupCoordinator { + +public static class Builder { +private final int nodeId; +private final GroupCoordinatorConfig config; +private PartitionWriter writer; +private CoordinatorLoader loader; + +public Builder( +int nodeId, +GroupCoordinatorConfig config +) { +this.nodeId = nodeId; +this.config = config; +} + +public Builder withWriter(PartitionWriter writer) { +this.writer = writer; +return this; +} + +
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
viktorsomogyi commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1219754042 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -447,11 +447,11 @@ class Partition(val topicPartition: TopicPartition, private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = { def updateHighWatermark(log: UnifiedLog): Unit = { val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse { -info(s"No checkpointed highwatermark is found for partition $topicPartition") +info(s"No checkpointed highwatermark is found for ${if (isFutureReplica) "Future partition" else "partition"} $topicPartition") Review Comment: nit: "future" not "Future"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
viktorsomogyi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1582838059 So I have some context with the replica fetcher area (mostly by reading and debugging), I hope I can help. First, since the conversation is a bit long, let me summarize what I understand: - The problem is disk A reaches its capacity limits - The solution is to move partition X-1 to disk B - During the reassignment, log cleaning is disabled on X-1 (which can therefore fill disk A) - The reassignment of X-1 fails, it is left failed there on B and X-1 on A keeps growing Is this correct? If it is, we may need to separate the deletion and compaction cases. I think resuming deletion is safe, however resuming compaction might not be, since compaction alters the log. If an operator somehow resumes B and lets replication continue, then the history of X-1 in A and B might be different (I'm still working on a local test case that reproduces this). What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
cadonna commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1223073798 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final long gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRecords; +private Serde keySerde; +private FullChangeSerde valueSerde; +private final String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod.toMillis(); +minTimestamp = Long.MAX_VALUE; +numRecords = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue; + +if (predicate.get()) { +try (final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) { +if (iterator.hasNext()) { +keyValue = iterator.next(); +} else { +if (numRecords() == 0) { Review Comment: Why do you not directly access the field? ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.st
[jira] [Commented] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation
[ https://issues.apache.org/jira/browse/KAFKA-15051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730611#comment-17730611 ] ASF GitHub Bot commented on KAFKA-15051: C0urante merged PR #520: URL: https://github.com/apache/kafka-site/pull/520 > docs: add missing connector plugin endpoint to documentation > > > Key: KAFKA-15051 > URL: https://issues.apache.org/jira/browse/KAFKA-15051 > Project: Kafka > Issue Type: Task > Components: docs, documentation >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Minor > > GET /plugin/config endpoint added in > [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions] > is not included in the connect documentation page: > https://kafka.apache.org/documentation/#connect_rest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite
C0urante merged PR #12307: URL: https://github.com/apache/kafka/pull/12307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite
C0urante commented on PR #12307: URL: https://github.com/apache/kafka/pull/12307#issuecomment-1582800062 Test failures appear unrelated; merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
divijvaidya commented on code in PR #13795: URL: https://github.com/apache/kafka/pull/13795#discussion_r1223158227 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -0,0 +1,1040 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.deferred.DeferredEvent; +import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator + * or the transaction coordinator. + * + * The runtime framework maps each underlying partitions (e.g. __consumer_offsets) that that broker is a + * leader of to a coordinator replicated state machine. A replicated state machine holds the hard and soft + * state of all the objects (e.g. groups or offsets) assigned to the partition. The hard state is stored in + * timeline datastructures backed by a SnapshotRegistry. The runtime supports two type of operations + * on state machines: (1) Writes and (2) Reads. + * + * (1) A write operation, aka a request, can read the full and potentially **uncommitted** state from state + * machine to handle the operation. A write operation typically generates a response and a list of + * records. The records are applies to the state machine and persisted to the partition. The response + * is parked until the records are committed and delivered when they are. + * + * (2) A read operation, aka a request, can only read the committed state from the state machine to handle + * the operation. A read operation typically generates a response that is immediately completed. + * + * The runtime framework exposes an asynchronous, future based, API to the world. All the operations + * are executed by an CoordinatorEventProcessor. The processor guarantees that operations for a + * single partition or state machine are not processed concurrently. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public class CoordinatorRuntime, U> { + +/** + * Builder to create a CoordinatorRuntime. + * + * @param The type of the state machine. + * @param The type of the record. + */ +public static class Builder, U> { +private LogContext logContext; +private CoordinatorEventProcessor eventProcessor; +private PartitionWriter partitionWriter; +private CoordinatorLoader loader; +private CoordinatorBuilderSupplier coordinatorBuilderSupplier; + +public Builder withLogContext(LogContext logContext) { +this.logContext = logContext; +return this; +} + +public Builder withEventProcessor(CoordinatorEventProcessor eventProcessor) { +this.eventProcessor = eventProcessor; +return this; +} + +public Builder withPartitionWriter(PartitionWriter partitionWriter) { +this.partitionWriter = partitionWriter; +return this; +} + +public Builder withLoader(CoordinatorLoader loader) { +this.loader = loader; +return this; +} + +public Builder withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier coordinatorBuilderSupplier) { +this.coordinatorBuilderSupplier = coordinatorBuilderSupplier; +return this; +} + +public CoordinatorRuntime build() { +if (logContext == null) +logContext = new LogContext(); +if (eventProcessor == null) +throw new IllegalAr
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1223142144 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -197,7 +199,8 @@ class BrokerServer( logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) - remoteLogManager = createRemoteLogManager(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review Comment: Correct me if I am wrong here but we already have RemoteLogManagerConfig. It is used here: https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/log/LogManager.scala#L1405 Hence, we can simply do `createRemoteLogManager(config.remoteLogManagerConfig)` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale
mumrah commented on PR #13827: URL: https://github.com/apache/kafka/pull/13827#issuecomment-1582676925 Thanks for taking a look @jlprat! For the more complex workflow, we could use https://github.com/actions/github-script. This basically lets you do anything that the Github API allows (kind of like using Groovy in a Gradle build 😄). I also found https://github.com/marketplace/actions/auto-label-merge-conflicts which would add a label to PRs with conflicts. This could be used to create two separate workflows of the "stale" action. I think there's a lot of flexibility with the actions available. Do you agree that we can scope this PR to "marking old PRs as stale" for the sake of clearing our backlog? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { +rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); Review Comment: please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG` ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { Review Comment: missing property `cluster.id` as per https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) -remoteLogManager = createRemoteLogManager(config) +val remoteLogManagerConfig = new RemoteLogManagerConfig(config) +remoteLogManager = createRemoteLogManager(remoteLogManagerConfig) Review Comment: s/remoteLogManager/remoteLogManagerOpt ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -504,6 +506,13 @@ class KafkaServer( new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) +remoteLogManager.foreach(rlm => { + val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) + val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) +.orElse(Some(brokerInfo.broker.endPoints.head)) Review Comment: this means that endpoint will never be optional (since we are picking up the first broker endpoint when it's not configrued). Right? In that case, can we make it mandatory please in RemoteLogManager? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { +rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); +rlmmProps.put("security.protocol", e.securityProtocol().name); Review Comment: please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) -remoteLogManager = createRemoteLogManager(config) +val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review Comment: should we create this only when `remoteLogManagerConfig.enableRemoteStorageSystem()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
divijvaidya commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1223073346 ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SystemTimer implements Timer { +// timeout timer +private final ExecutorService taskExecutor; +private final DelayQueue delayQueue; +private final AtomicInteger taskCounter; +private final TimingWheel timingWheel; + +// Locks used to protect data structures while ticking +private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); +private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); +private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); + +public SystemTimer(String executorName) { +this(executorName, 1, 20, Time.SYSTEM.hiResClockMs()); +} + +public SystemTimer( +String executorName, +long tickMs, +int wheelSize, +long startMs +) { +this.taskExecutor = Executors.newFixedThreadPool(1, +runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable)); +this.delayQueue = new DelayQueue<>(); +this.taskCounter = new AtomicInteger(0); +this.timingWheel = new TimingWheel( +tickMs, +wheelSize, +startMs, +taskCounter, +delayQueue +); +} + +public void add(TimerTask timerTask) { +readLock.lock(); Review Comment: we are using this pattern of locking very often now (also used in TS related code for leader epoch snapshot). Could we perhaps extract this out into a utility which takes incoming lambda and executes it in a lock, similar to https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/utils/CoreUtils.scala#L182 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; +// timestamp in millisecond +public final long delayMs; + +public TimerTask(long delayMs) { +this.delayMs = delayMs; +} + +public void cancel() { +synchronized (this) { Review Comment: you can move `synchronized` to the method signature above? ``` public synchronized a() { } ``` is equivalent to ``` public a() { synchronized(this) { } } ``` ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for addition
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1222869378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, Map connector connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { log.debug("Altering consumer group offsets for sink connector: {}", connName); -alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); } else { log.debug("Altering offsets for source connector: {}", connName); -alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Reset a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be reset + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion + */ +public void resetConnectorOffsets(String connName, Map connectorConfig, Callback cb) { Review Comment: > If we're worried about accidentally introducing a nasty bug where an empty-bodied alter request causes an unintentional reset, we can add an integration test for that case. Yeah, this was exactly my worry and the reason why I'd kept them separated. Based on your feedback, I've added a new integration test and also moved the second level check to `AbstractHerder::alterConnectorOffsets` (so that we can consolidate the two methods in the `Worker`). While we could in theory do a similar consolidation for the `Herder` methods, I think it's probably a better idea to have cleaner and more well-defined interface methods there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
lucasbru commented on code in PR #13829: URL: https://github.com/apache/kafka/pull/13829#discussion_r1222994406 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina task.addPartitionsForOffsetReset(assignedToPauseAndReset); } +if (stateUpdater != null) { +tasks.removeTask(task); Review Comment: Got it. Could make sense to add a little comment, but I won't insist -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
lucasbru commented on code in PR #13829: URL: https://github.com/apache/kafka/pull/13829#discussion_r1222992405 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1970,6 +1970,29 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(thrown.getCause().getMessage(), is("KABOOM!")); } +@Test +public void shouldReAddRevivedTasksToStateUpdater() { +final StreamTask corruptedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId03Partitions).build(); +final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) +.inState(State.RUNNING) +.withInputPartitions(taskId02Partitions).build(); +final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); +final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); +when(tasks.task(taskId03)).thenReturn( corruptedActiveTask); +when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask); +expect(consumer.assignment()).andReturn(emptySet()); +replay(consumer); + +taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), corruptedStandbyTask.id())); + +Mockito.verify(tasks).removeTask(corruptedActiveTask); Review Comment: Yes, that's what I meant. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito
cadonna commented on code in PR #13712: URL: https://github.com/apache/kafka/pull/13712#discussion_r1222986401 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3694,12 +3694,12 @@ public Map prepareCommit() { @Test public void shouldSendPurgeData() { -resetToStrict(adminClient); -expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L -.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture(; -expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L -.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture(; -replay(adminClient); +when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L +.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture(; +when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L +.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture(; + +InOrder inOrder = Mockito.inOrder(adminClient); Review Comment: ```suggestion final InOrder inOrder = Mockito.inOrder(adminClient); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
cadonna commented on code in PR #13829: URL: https://github.com/apache/kafka/pull/13829#discussion_r1222976522 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1970,6 +1970,29 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(thrown.getCause().getMessage(), is("KABOOM!")); } +@Test +public void shouldReAddRevivedTasksToStateUpdater() { +final StreamTask corruptedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId03Partitions).build(); +final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) +.inState(State.RUNNING) +.withInputPartitions(taskId02Partitions).build(); +final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); +final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); +when(tasks.task(taskId03)).thenReturn( corruptedActiveTask); +when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask); +expect(consumer.assignment()).andReturn(emptySet()); +replay(consumer); + +taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), corruptedStandbyTask.id())); + +Mockito.verify(tasks).removeTask(corruptedActiveTask); Review Comment: The task is a mock. The state of the task is specified by the test. However, I added verifications for the method calls that change the state of the task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
cadonna commented on code in PR #13829: URL: https://github.com/apache/kafka/pull/13829#discussion_r1222962171 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina task.addPartitionsForOffsetReset(assignedToPauseAndReset); } +if (stateUpdater != null) { +tasks.removeTask(task); Review Comment: Yes! The reason is that `removeTask()` verifies that the state of the task is `CLOSED` and throws an exception when it is not. I did not want to change the check and lose that guard. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
lucasbru commented on code in PR #13829: URL: https://github.com/apache/kafka/pull/13829#discussion_r1222946279 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina task.addPartitionsForOffsetReset(assignedToPauseAndReset); } +if (stateUpdater != null) { +tasks.removeTask(task); Review Comment: Any particular reason why we remove the task from `tasks` before reviving it? It would seem cleaner to me to basically remove from stream thread / add back to state updater directly in one block, which makes it easier to track ownership. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1970,6 +1970,29 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(thrown.getCause().getMessage(), is("KABOOM!")); } +@Test +public void shouldReAddRevivedTasksToStateUpdater() { +final StreamTask corruptedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId03Partitions).build(); +final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) +.inState(State.RUNNING) +.withInputPartitions(taskId02Partitions).build(); +final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); +final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); +when(tasks.task(taskId03)).thenReturn( corruptedActiveTask); +when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask); +expect(consumer.assignment()).andReturn(emptySet()); +replay(consumer); + +taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), corruptedStandbyTask.id())); + +Mockito.verify(tasks).removeTask(corruptedActiveTask); Review Comment: Do we want to validate the Task state as well? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1970,6 +1970,29 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(thrown.getCause().getMessage(), is("KABOOM!")); } +@Test +public void shouldReAddRevivedTasksToStateUpdater() { +final StreamTask corruptedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions) +.inState(State.RESTORING) +.withInputPartitions(taskId03Partitions).build(); +final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) +.inState(State.RUNNING) +.withInputPartitions(taskId02Partitions).build(); +final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); +final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); +when(tasks.task(taskId03)).thenReturn( corruptedActiveTask); Review Comment: Awkward space after `(`, also in the line below, maybe use automatic formatting for the changed code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling
cadonna opened a new pull request, #13829: URL: https://github.com/apache/kafka/pull/13829 Fixes a bug regarding the state updater where tasks that experience corruption during restoration are passed from the state updater to the stream thread for closing and reviving but then the revived tasks are not re-added to the state updater. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
tinaselenge commented on PR #13760: URL: https://github.com/apache/kafka/pull/13760#issuecomment-1582411061 @divijvaidya @showuon @mimaison Thank you very much for reviewing the PR! I believe I have addressed the comments now. Please let me know if I have missed anything. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon opened a new pull request, #13828: URL: https://github.com/apache/kafka/pull/13828 add "remote.log.metadata.manager.listener.name" config to rlmm to allow producer/consumer to connect to the server. Also add tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode
showuon commented on PR #13807: URL: https://github.com/apache/kafka/pull/13807#issuecomment-1582296785 @satishd , FYI Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=zk Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored() Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testCreateDeleteTopics() Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout(String).quorum=kraft Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testDescribeQuorumRequestToBrokers() Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndCreateAndManyTopics() Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale
jlprat commented on code in PR #13827: URL: https://github.com/apache/kafka/pull/13827#discussion_r1222732234 ## .github/workflows/stale.yml: ## @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: 'Close stale issues and PRs' +on: + #schedule: + # - cron: '30 3 * * *' + workflow_dispatch: +inputs: + dryRun: +description: 'Dry Run' +required: true +default: true +type: boolean + operationsPerRun: +description: 'Max GitHub API operations' +required: true +default: 30 +type: number + +permissions: + issues: write + pull-requests: write + +jobs: + stale: +runs-on: ubuntu-latest +steps: + - uses: actions/stale@v8 +with: + debug-only: ${{ inputs.dryRun }} + operations-per-run: ${{ inputs.operationsPerRun }} + days-before-stale: 90 + days-before-close: -1 + stale-issue-label: 'stale' Review Comment: I would probably add the message there: `stale-pr-message: 'This PR is stale because it has been open 90 days with no activity.'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
mimaison commented on code in PR #13760: URL: https://github.com/apache/kafka/pull/13760#discussion_r1222667563 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.admin.DeletedRecords; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.DeleteRecordsRequestData; +import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DeleteRecordsRequest; +import org.apache.kafka.common.requests.DeleteRecordsResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class DeleteRecordsHandler extends Batched { + +private final Map recordsToDelete; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public DeleteRecordsHandler( +Map recordsToDelete, +LogContext logContext +) { +this.recordsToDelete = recordsToDelete; +this.log = logContext.logger(DeleteRecordsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "deleteRecords"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +public static SimpleAdminApiFuture newFuture( +Collection topicPartitions +) { +return AdminApiFuture.forKeys(new HashSet<>(topicPartitions)); +} + +@Override +public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map deletionsForTopic = new HashMap<>(); +for (Map.Entry entry: recordsToDelete.entrySet()) { +TopicPartition topicPartition = entry.getKey(); +DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = deletionsForTopic.get(topicPartition.topic()); +if (deleteRecords == null) { +deleteRecords = new DeleteRecordsRequestData.DeleteRecordsTopic() +.setName(topicPartition.topic()); +deletionsForTopic.put(topicPartition.topic(), deleteRecords); +} +deleteRecords.partitions().add(new DeleteRecordsRequestData.DeleteRecordsPartition() +.setPartitionIndex(topicPartition.partition()) +.setOffset(entry.getValue().beforeOffset())); + +System.out.println("Partitions: " + deleteRecords.partitions()); Review Comment: Let's remove this debugging statement ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Lic
[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
ashwinpankaj commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1222706232 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); + +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +secondaryStore.set(values, (secondaryWriteError, ignored) -> { +try (LoggingContext context = loggingContext()) { +if (secondaryWriteError != null) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); +} else { +log.debug("Successfully flushed tombstone offsets to secondary backing store"); +} +} +}); +} + return primaryStore.set(values, (primaryWriteError, ignored) -> { -if (secondaryStore != null) { +// Secondary store writes have already happened for tombstone records Review Comment: +1 we can ensure that the secondary write has already been attempted via a `CompletableFuture` set in the callback of `secondaryStore.set()` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.entrySet() +.stream() +.anyMatch(offset -> offset.getValue() == null); Review Comment: We should do this only if `connectorStore.isPresent()` else we will always end up scanning the map for tombstones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data to partition 4 in several minutes and consumer consume data through @KafkaListener , just one consumer exists. was: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data to partition 4 in several minutes and consumer consume data through @KafkaListener , and one consumer consume the data from the partition. > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I got ?? "Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W > data to partition 4 in several minutes and consumer consume data through > @KafkaListener , just one consumer exists. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset
[ https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YaYun Wang updated KAFKA-15074: --- Description: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data to partition 4 in several minutes and consumer consume data through @KafkaListener , and one consumer consume the data from the partition. was: I got ?? "Fetch position FetchPosition{offset=42574305, offsetEpoch=Optional[2214], currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: cn-north-1d)], epoch=2214}} is out of range for partition vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through @KafkaListener in that case producer publish 100W data to partition 4 in several minutes, and one consumer consume the data from the partition. > offset out of range for partition xxx, resetting offset > --- > > Key: KAFKA-15074 > URL: https://issues.apache.org/jira/browse/KAFKA-15074 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.2 >Reporter: YaYun Wang >Priority: Major > > I got ?? "Fetch position FetchPosition{offset=42574305, > offsetEpoch=Optional[2214], > currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: > cn-north-1d)], epoch=2214}} is out of range for partition > vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W > data to partition 4 in several minutes and consumer consume data through > @KafkaListener , and one consumer consume the data from the partition. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
urbandan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1582188829 > I guess I just need to clarify what retried batches are here -- is the idea that we wait for inflight batches to return a response or time out? What if the response triggers another retry? Would we prevent that from sending out? The core idea is that we let each of the in-flight batches complete, even if they need multiple retries. This would allow the producer to 1. Avoid inconsistency - by letting in-flight batches finish, we do not run the risk of overwriting their sequence number while we are still not sure if they were appended or not. 2. Operate with best-effort - when using an idempotent producer, and encountering an error, it is costly to verify if a message was appended to the log or not (I think the "official" suggestion is to consume the topic to verify). By letting the in-flight batches finish, the idempotent producer will report fewer false positive errors. > I'm also wondering the benefit of preserving the previous batches if there is an error. How does the system recover differently if we allow those batches to "complete". I think we could run into cases where the error causes the inflight batches to be unable to be written. Do we prefer to fail them (what we may do with this change) and start clean or try to write them with new sequences? I can see both scenarios causing issues. I believe that produce errors should be handled separately, and should not cascade to other batches. I think most errors do not really predict the result of other produce requests. > I guess it boils down to availability of writes (rewriting the sequences allows us to continue writing) or idempotency correctness (trying to wait for them to complete with their old sequences). The sticking point I'm running into is why getting those extra inflight requests (potentially) written is better if we've hit a non-retriable error. My understanding is that here correctness beats availability. Are you suggesting that we should just cancel in-flight batches when encountering an error? > Maybe I just need an example :) I will try to write up some examples, and also write more unit tests to demonstrate those scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs
showuon commented on code in PR #13650: URL: https://github.com/apache/kafka/pull/13650#discussion_r1222665546 ## connect/mirror/README.md: ## @@ -1,297 +0,0 @@ - -# MirrorMaker 2.0 - -MM2 leverages the Connect framework to replicate topics between Kafka -clusters. MM2 includes several new features, including: - - - both topics and consumer groups are replicated - - topic configuration and ACLs are replicated - - cross-cluster offsets are synchronized - - partitioning is preserved - -## Replication flows - -MM2 replicates topics and consumer groups from upstream source clusters -to downstream target clusters. These directional flows are notated -`A->B`. - -It's possible to create complex replication topologies based on these -`source->target` flows, including: - - - *fan-out*, e.g. `K->A, K->B, K->C` - - *aggregation*, e.g. `A->K, B->K, C->K` - - *active/active*, e.g. `A->B, B->A` - -Each replication flow can be configured independently, e.g. to replicate -specific topics or groups: - -A->B.topics = topic-1, topic-2 -A->B.groups = group-1, group-2 - -By default, all topics and consumer groups are replicated (except -excluded ones), across all enabled replication flows. Each -replication flow must be explicitly enabled to begin replication: - -A->B.enabled = true -B->A.enabled = true - -## Starting an MM2 process - -You can run any number of MM2 processes as needed. Any MM2 processes -which are configured to replicate the same Kafka clusters will find each -other, share configuration, load balance, etc. - -To start an MM2 process, first specify Kafka cluster information in a -configuration file as follows: - -# mm2.properties -clusters = us-west, us-east -us-west.bootstrap.servers = host1:9092 -us-east.bootstrap.servers = host2:9092 - -You can list any number of clusters this way. - -Optionally, you can override default MirrorMaker properties: - -topics = .* -groups = group1, group2 -emit.checkpoints.interval.seconds = 10 - -These will apply to all replication flows. You can also override default -properties for specific clusters or replication flows: - -# configure a specific cluster -us-west.offset.storage.topic = mm2-offsets - -# configure a specific source->target replication flow -us-west->us-east.emit.heartbeats = false - -Next, enable individual replication flows as follows: - -us-west->us-east.enabled = true # disabled by default - -Finally, launch one or more MirrorMaker processes with the `connect-mirror-maker.sh` -script: - -$ ./bin/connect-mirror-maker.sh mm2.properties - -## Multicluster environments - -MM2 supports replication between multiple Kafka clusters, whether in the -same data center or across multiple data centers. A single MM2 cluster -can span multiple data centers, but it is recommended to keep MM2's producers -as close as possible to their target clusters. To do so, specify a subset -of clusters for each MM2 node as follows: - -# in west DC: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2 - -This signals to the node that the given clusters are nearby, and prevents the -node from sending records or configuration to clusters in other data centers. - -### Example - -Say there are three data centers (west, east, north) with two Kafka -clusters in each data center (west-1, west-2 etc). We can configure MM2 -for active/active replication within each data center, as well as cross data -center replication (XDCR) as follows: - -# mm2.properties -clusters: west-1, west-2, east-1, east-2, north-1, north-2 - -west-1.bootstrap.servers = ... ----%<--- - -# active/active in west -west-1->west-2.enabled = true -west-2->west-1.enabled = true - -# active/active in east -east-1->east-2.enabled = true -east-2->east-1.enabled = true - -# active/active in north -north-1->north-2.enabled = true -north-2->north-1.enabled = true - -# XDCR via west-1, east-1, north-1 -west-1->east-1.enabled = true -west-1->north-1.enabled = true -east-1->west-1.enabled = true -east-1->north-1.enabled = true -north-1->west-1.enabled = true -north-1->east-1.enabled = true - -Then, launch MM2 in each data center as follows: - -# in west: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2 - -# in east: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2 - -# in north: -$ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2 - -With this configuration, records produced to any cluster will be replicated -within the data center, as well as across to other data centers. By providing -the `--clusters` parameter, we ensure that each node only produces records to -nearby clusters. - -N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13823: MINOR: Move MockTime to server-common
divijvaidya commented on code in PR #13823: URL: https://github.com/apache/kafka/pull/13823#discussion_r1222638839 ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { +public final MockScheduler scheduler; - val scheduler = new MockScheduler(this) +public MockTime() { +this(System.currentTimeMillis(), System.nanoTime()); Review Comment: please consider using `Time.SYSTEM` ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { Review Comment: My preference would be to rename this class. Having two MockTime is super confusing. ## server-common/src/test/java/org/apache/kafka/server/util/MockTime.java: ## @@ -27,15 +23,21 @@ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way. * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`. */ -class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) { - - def this() = this(System.currentTimeMillis(), System.nanoTime()) +public class MockTime extends org.apache.kafka.common.utils.MockTime { Review Comment: should this implement AutoCloseable so that we can shutdown the scheduler correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
showuon commented on code in PR #13760: URL: https://github.com/apache/kafka/pull/13760#discussion_r1222617381 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2291,6 +2289,8 @@ public void testDeleteRecords() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); Review Comment: Before looking into the implementation, I don't think this test is testing what we expected. The original comment is commenting around this lines: https://github.com/apache/kafka/pull/7296/files#diff-5422d10d9a7f4776c6538ae3aea27f24e94cf4ecf5e752040125aca6edc795d3R3671-R3682 And it said, when metadata response (mr) contains error, we just fail the future without retry. And here, the existing metadataResponse also fail without retry: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2969-L2972 The point is, we want to retry `metadataResponse`, not `deleteRecordResponse`. What I expected is tests like this: https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L5431C11-L5433 Please update the test and make sure it passed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
showuon commented on code in PR #13760: URL: https://github.com/apache/kafka/pull/13760#discussion_r1222617381 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2291,6 +2289,8 @@ public void testDeleteRecords() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); Review Comment: Before looking into the implementation, I don't think this test is testing what we expected. The original comment is commenting around this lines: https://github.com/apache/kafka/pull/7296/files#diff-5422d10d9a7f4776c6538ae3aea27f24e94cf4ecf5e752040125aca6edc795d3R3671-R3682 And it said, when metadata response (mr) contains error, we just fail the future without retry. The point is, we want to retry `metadataResponse`, not `deleteRecordResponse`. What I expected is tests like this: https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L5431C11-L5433 Please update the test and make sure it passed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730436#comment-17730436 ] Steven Booke commented on KAFKA-14995: -- [~vvcephei] Hello John, this will be my first time contributing and I would like to assign myself to this ticket but I am unable to do so. Could you assign it to me please? > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1222556934 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: I think not, it will be ignored because of the UNINITIALIZED state, snippet from transitionTo: ``` else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { if (error == null) throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception"); lastError = error; } ``` If it's ignored, do we need to log it somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org