[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186714886 ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +public class RemoteLogReader implements Callable { +private final Logger logger; +private final RemoteStorageFetchInfo fetchInfo; +private final RemoteLogManager rlm; +private final Consumer callback; + +public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, + RemoteLogManager rlm, + Consumer callback) { +this.fetchInfo = fetchInfo; +this.rlm = rlm; +this.callback = callback; +logger = new LogContext() { +@Override +public String logPrefix() { +return "[" + Thread.currentThread().getName() + "]"; +} +}.logger(RemoteLogReader.class); +} + +@Override +public Void call() { +RemoteLogReadResult result; +try { +logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); + +FetchDataInfo fetchDataInfo = rlm.read(fetchInfo); +result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); +} catch (OffsetOutOfRangeException e) { +result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); +} catch (Exception e) { Review Comment: As the issue is not related to this set of changes, we can look into it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14805) KRaft Controller shouldn't allow metadata updates before migration starts
[ https://issues.apache.org/jira/browse/KAFKA-14805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14805. -- Resolution: Fixed > KRaft Controller shouldn't allow metadata updates before migration starts > - > > Key: KAFKA-14805 > URL: https://issues.apache.org/jira/browse/KAFKA-14805 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.5.0 > > > When starting a ZK to KRaft migration, the new KRaft quorum should not accept > external metadata updates from things like CreateTopics or > AllocateProducerIds. Having metadata present in the log prior to the > migration can lead to undefined state, which is not great. > This is currently causing test failures on trunk because some producer is > allocating a producer ID between the time the KRaft quorum starts and the > migration starts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14840) Handle KRaft snapshots in dual-write mode
[ https://issues.apache.org/jira/browse/KAFKA-14840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14840. -- Fix Version/s: (was: 3.4.1) Resolution: Fixed > Handle KRaft snapshots in dual-write mode > - > > Key: KAFKA-14840 > URL: https://issues.apache.org/jira/browse/KAFKA-14840 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Affects Versions: 3.4.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.5.0 > > > While the KRaft controller is making writes back to ZK during the migration, > we need to handle the case when a snapshot is loaded. This can happen for a > number of reasons in KRaft. > The difficulty here is we will need to compare the loaded snapshot with the > entire state in ZK. Most likely, this will be a very expensive operation. > Without this, dual-write mode cannot safely tolerate a snapshot being loaded, > so marking this as a 3.5 blocker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537107938 The failing tests are not related to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] oliveigah opened a new pull request, #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request
oliveigah opened a new pull request, #13680: URL: https://github.com/apache/kafka/pull/13680 While running a parser for the new version of kafka commons in order to generate some assets for a kafka protocol library in other language. I've noticed that the recently added `ReplicaState` attribute on Fetch request is the only one that does not have a `versions` tag. Since my parser was relying on the existence of this tag for every field, it failed. I wonder if it is the expected state or if it is indeed a mistake. Anyway I've update the parser in order to use `taggedVersions` as a fallback when `versions` are not found. It works for me now. If this is the proper state of the file, please let me know! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request, #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
dengziming opened a new pull request, #13679: URL: https://github.com/apache/kafka/pull/13679 *More detailed description of your change* The KRaft controller return empty finalized features in `ApiVersionResponse`, the brokers are not infected by this, so this problem doesn't have any impact currently, but it's worth fixing it to avoid unexpected problems. And there is a bunch of of confusing methods in `ApiVersionResponse` which are only used in test code, I moved them to `TestUtils` to make the code more clear, and force everyone to pass in the correct parameters instead of the default zero parameters, for example, empty `supportedFeatures` and empty `finalizedFeatures`. *Summary of testing strategy (including rationale)* Added logic for checking `supportedFeatures` and `finalizedFeatures` in `AbstractApiVersionsRequestTest`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deng Ziming reopened KAFKA-14291: - Assignee: Deng Ziming > KRaft: ApiVersionsResponse doesn't have finalizedFeatures and > finalizedFeatureEpoch in KRaft mode > - > > Key: KAFKA-14291 > URL: https://issues.apache.org/jira/browse/KAFKA-14291 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Akhilesh Chaganti >Assignee: Deng Ziming >Priority: Critical > > https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53 > ``` > class SimpleApiVersionManager( > val listenerType: ListenerType, > val enabledApis: collection.Set[ApiKeys], > brokerFeatures: Features[SupportedVersionRange] > ) extends ApiVersionManager { > def this(listenerType: ListenerType) = { > this(listenerType, ApiKeys.apisForListener(listenerType).asScala, > BrokerFeatures.defaultSupportedFeatures()) > } > private val apiVersions = > ApiVersionsResponse.collectApis(enabledApis.asJava) > override def apiVersionResponse(requestThrottleMs: Int): > ApiVersionsResponse = { > ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, > apiVersions, brokerFeatures) > } > } > ``` > ApiVersionManager for KRaft doesn't add the finalizedFeatures and > finalizedFeatureEpoch to the ApiVersionsResponse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten commented on PR #13678: URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537071678 This PR is a copy of https://github.com/apache/kafka/pull/9111 so all credits go to @thomaslee. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720114#comment-17720114 ] Erik van Oosten commented on KAFKA-10337: - [~thomaslee] when we use commitAsync from the rebalance listener (potentially with empty offsets), no polling takes place anymore. Shall I amend the PR so that it does polling from commitAsync as well? WDYT? > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Kirk True >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720113#comment-17720113 ] Erik van Oosten commented on KAFKA-10337: - Opened [~thomaslee] 's PR again: https://github.com/apache/kafka/pull/13678 > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Kirk True >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten opened a new pull request, #13678: KAFKA-10337: await async commits in commitSync even if no offsets given
erikvanoosten opened a new pull request, #13678: URL: https://github.com/apache/kafka/pull/13678 The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns. Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync(). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org