[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-06 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1186714886


##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable {
+private final Logger logger;
+private final RemoteStorageFetchInfo fetchInfo;
+private final RemoteLogManager rlm;
+private final Consumer callback;
+
+public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+   RemoteLogManager rlm,
+   Consumer callback) {
+this.fetchInfo = fetchInfo;
+this.rlm = rlm;
+this.callback = callback;
+logger = new LogContext() {
+@Override
+public String logPrefix() {
+return "[" + Thread.currentThread().getName() + "]";
+}
+}.logger(RemoteLogReader.class);
+}
+
+@Override
+public Void call() {
+RemoteLogReadResult result;
+try {
+logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+
+FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
+} catch (OffsetOutOfRangeException e) {
+result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+} catch (Exception e) {

Review Comment:
   As the issue is not related to this set of changes, we can look into it 
later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14805) KRaft Controller shouldn't allow metadata updates before migration starts

2023-05-06 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14805.
--
Resolution: Fixed

> KRaft Controller shouldn't allow metadata updates before migration starts
> -
>
> Key: KAFKA-14805
> URL: https://issues.apache.org/jira/browse/KAFKA-14805
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> When starting a ZK to KRaft migration, the new KRaft quorum should not accept 
> external metadata updates from things like CreateTopics or 
> AllocateProducerIds. Having metadata present in the log prior to the 
> migration can lead to undefined state, which is not great.
> This is currently causing test failures on trunk because some producer is 
> allocating a producer ID between the time the KRaft quorum starts and the 
> migration starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14840) Handle KRaft snapshots in dual-write mode

2023-05-06 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14840.
--
Fix Version/s: (was: 3.4.1)
   Resolution: Fixed

> Handle KRaft snapshots in dual-write mode
> -
>
> Key: KAFKA-14840
> URL: https://issues.apache.org/jira/browse/KAFKA-14840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0
>
>
> While the KRaft controller is making writes back to ZK during the migration, 
> we need to handle the case when a snapshot is loaded. This can happen for a 
> number of reasons in KRaft.
> The difficulty here is we will need to compare the loaded snapshot with the 
> entire state in ZK. Most likely, this will be a very expensive operation.
> Without this, dual-write mode cannot safely tolerate a snapshot being loaded, 
> so marking this as a 3.5 blocker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-06 Thread via GitHub


erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537107938

   The failing tests are not related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] oliveigah opened a new pull request, #13680: MINOR: Add "versions" tag to recently added ReplicaState field on Fetch Request

2023-05-06 Thread via GitHub


oliveigah opened a new pull request, #13680:
URL: https://github.com/apache/kafka/pull/13680

   While running a parser for the new version of kafka commons in order to 
generate some assets for a kafka protocol library in other language. I've 
noticed that the recently added `ReplicaState` attribute on Fetch request is 
the only one that does not have a `versions` tag.
   
   Since my parser was relying on the existence of this tag for every field, it 
failed. I wonder if it is the expected state or if it is indeed a mistake.
   
   Anyway I've update the parser in order to use `taggedVersions` as a fallback 
when `versions` are not found. It works for me now.
   
   If this is the proper state of the file, please let me know!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming opened a new pull request, #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-06 Thread via GitHub


dengziming opened a new pull request, #13679:
URL: https://github.com/apache/kafka/pull/13679

   *More detailed description of your change*
   
   The KRaft controller return empty finalized features in 
`ApiVersionResponse`, the brokers are not infected by this, so this problem 
doesn't have any impact currently, but it's worth fixing it to avoid unexpected 
problems.
   And there is a bunch of of confusing methods in `ApiVersionResponse` which 
are only used in test code, I moved them to `TestUtils` to make the code more 
clear, and force everyone to pass in the correct parameters instead of the 
default zero parameters, for example, empty `supportedFeatures` and empty 
`finalizedFeatures`.
   
   
   *Summary of testing strategy (including rationale)*
   Added logic for checking `supportedFeatures` and `finalizedFeatures` in 
`AbstractApiVersionsRequestTest`.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2023-05-06 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming reopened KAFKA-14291:
-
  Assignee: Deng Ziming

> KRaft: ApiVersionsResponse doesn't have finalizedFeatures and 
> finalizedFeatureEpoch in KRaft mode
> -
>
> Key: KAFKA-14291
> URL: https://issues.apache.org/jira/browse/KAFKA-14291
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Assignee: Deng Ziming
>Priority: Critical
>
> https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
> ```
> class SimpleApiVersionManager(
>   val listenerType: ListenerType,
>   val enabledApis: collection.Set[ApiKeys],
>   brokerFeatures: Features[SupportedVersionRange]
> ) extends ApiVersionManager {
>   def this(listenerType: ListenerType) = {
> this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
> BrokerFeatures.defaultSupportedFeatures())
>   }
>   private val apiVersions = 
> ApiVersionsResponse.collectApis(enabledApis.asJava)
>   override def apiVersionResponse(requestThrottleMs: Int): 
> ApiVersionsResponse = {
> ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
> apiVersions, brokerFeatures)
>   }
> }
> ```
> ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
> finalizedFeatureEpoch to the ApiVersionsResponse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-06 Thread via GitHub


erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537071678

   This PR is a copy of https://github.com/apache/kafka/pull/9111 so all 
credits go to @thomaslee.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-05-06 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720114#comment-17720114
 ] 

Erik van Oosten commented on KAFKA-10337:
-

[~thomaslee] when we use commitAsync from the rebalance listener (potentially 
with empty offsets), no polling takes place anymore. Shall I amend the PR so 
that it does polling from commitAsync as well? WDYT?

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Kirk True
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-05-06 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720113#comment-17720113
 ] 

Erik van Oosten commented on KAFKA-10337:
-

Opened [~thomaslee] 's PR again: https://github.com/apache/kafka/pull/13678

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Kirk True
>Priority: Major
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten opened a new pull request, #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-06 Thread via GitHub


erikvanoosten opened a new pull request, #13678:
URL: https://github.com/apache/kafka/pull/13678

   The contract for commitSync() guarantees that the callbacks for all prior 
async commits will be invoked before it (successfully?) returns. Prior to this 
change the contract could be violated if an empty offsets map were passed in to 
commitSync().
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org