Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
atu-sharm commented on PR #14473: URL: https://github.com/apache/kafka/pull/14473#issuecomment-1744276832 Hey @mimaison can you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
atu-sharm commented on PR #14473: URL: https://github.com/apache/kafka/pull/14473#issuecomment-1744276403 As per - Gradle docs : https://docs.gradle.org/current/userguide/plugins.html#sec:plugin_version_management:~:text=will%20be%20checked.-,Plugin%20Version%20Management,-A%20plugins%20%7B%7D - PR: https://github.com/gradle/gradle/issues/1697#issuecomment-506910915 "We cannot call methods or use sophisticated build logic to retrieve dependency versions. But anything declared in a gradle.properties file can be inserted into the version string." Made changes accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
atu-sharm closed pull request #14472: KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies URL: https://github.com/apache/kafka/pull/14472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
atu-sharm opened a new pull request, #14473: URL: https://github.com/apache/kafka/pull/14473 Having a single version of swagger to avoid breaking of build *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
atu-sharm opened a new pull request, #14472: URL: https://github.com/apache/kafka/pull/14472 Having a single version of swagger to avoid breaking of build *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
yangy commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1343485049 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) Review Comment: I see -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15521) Refactor build.gradle to align gradle swagger plugin with swagger dependencies
[ https://issues.apache.org/jira/browse/KAFKA-15521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771334#comment-17771334 ] Atul Sharma commented on KAFKA-15521: - Hi, [~mimaison] can i take this up? > Refactor build.gradle to align gradle swagger plugin with swagger dependencies > -- > > Key: KAFKA-15521 > URL: https://issues.apache.org/jira/browse/KAFKA-15521 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Mickael Maison >Priority: Major > > We use both the Swagger Gradle plugin > "io.swagger.core.v3.swagger-gradle-plugin" and 2 Swagger dependencies > swaggerAnnotations and swaggerJaxrs2. The version for the Gradle plugin is in > build.gradle while the version for the dependency is in > gradle/dependencies.gradle. > When we upgrade the version of one or the other it sometimes cause build > breakages, for example https://github.com/apache/kafka/pull/13387 and > https://github.com/apache/kafka/pull/14464 > We should try to have the version defined in a single place to avoid breaking > the build again. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1343475329 ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio context.listener.currentLeaderAndEpoch()); } +@Test +public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception { Review Comment: Added a test in `KafkaRaftClientSnapshotTest`. > Can we also tests the opposite. That the leader doesn't resign if the majority of the replicas (including the leader) have fetch in the last fetchTimeoutMs? I didn't follow you. I've verified: ``` - 1/2 fetch time leadership not get reassigned - fetch from one voter - 1/2 fetch time leadership not get reassigned - fetch from another voter - 1/2 fetch time leadership not get reassigned - fetch from the observer - 1/2 fetch time leadership should get reassigned ``` I think I've verified what you want. Let me know if I need to add other things. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1343472387 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } +this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { Review Comment: Fair enough. Added comments on the methods. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] enable test, check if continues failing in CI [kafka]
github-actions[bot] commented on PR #13953: URL: https://github.com/apache/kafka/pull/13953#issuecomment-1744109897 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1343331209 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset))) + appendInfo.setFirstOffset(duplicate.firstOffset) appendInfo.setLastOffset(duplicate.lastOffset) appendInfo.setLogAppendTime(duplicate.timestamp) appendInfo.setLogStartOffset(logStartOffset) case None => - // Before appending update the first offset metadata to include segment information - appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata => Review Comment: We removed this because none of the readers use anything besides the `messageOffset` (which is the `appendInfo.firstOffset` now). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1343327345 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig, ) } -val errorResults = errorsPerPartition.map { - case (topicPartition, error) => -topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) +val allResults = localProduceResults ++ errorResults +val produceStatus = allResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( +result.info.lastOffset + 1, // required offset +new PartitionResponse( + result.error, + result.info.firstOffset, + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + result.exception.map(_.getMessage).orNull Review Comment: Hmm, good catch. somehow I didn't think about that. I refactored some of the LogAppendResult stuff to try to make it more explicit when we use a custom error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14504: Implement DescribeGroups API [kafka]
dongnuo123 commented on code in PR #14462: URL: https://github.com/apache/kafka/pull/14462#discussion_r1343272684 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -444,6 +445,66 @@ public List listGroups(List statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } +/** + * Handles a DescribeGroup request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the DescribeGroupsResponseData.DescribedGroup. + */ +public List describeGroups( +List groupIds, +long committedOffset +) { +final List describedGroups = new ArrayList<>(); +groupIds.forEach(groupId -> { +try { +Group group = group(groupId, committedOffset); +if (group.type() != GENERIC) { +// We don't support upgrading/downgrading between protocols at the moment, so +// we throw an exception if a group exists with the wrong type. +throw new GroupIdNotFoundException(String.format("Group %s is not a generic group.", +groupId)); +} +GenericGroup genericGroup = (GenericGroup) group; Review Comment: Yeah, you're right. We should return a GroupCoordinator.DeadGroup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
junrao commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1334821243 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.commo
Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]
jeffkbkim commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1343258721 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } +/** + * Handles a DeleteGroups request. + * Populates the record list passed in with record to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by + * calling {@link GroupMetadataManager#validateDeleteGroup(String)}. + * + * @param groupId The ID of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. + * @param records The record list to populate. + */ +public void deleteGroup( +String groupId, +List records +) { +// In this method, we only populate records with tombstone records, so we don't expect an exception to be thrown here. Review Comment: "At this point, we have already validated the group id so we know that the group exists and that no exception will be thrown." how's this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -341,6 +384,22 @@ public CoordinatorResult genericGroupLeave( return groupMetadataManager.genericGroupLeave(context, request); } +/** + * Handles a OffsetDelete request. Review Comment: nit: an ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -333,6 +349,94 @@ public CoordinatorResult commitOffset( return new CoordinatorResult<>(records, response); } +/** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteOffsets( +OffsetDeleteRequestData request +) throws ApiException { +final Group group = validateOffsetDelete(request); +final List records = new ArrayList<>(); +final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = Review Comment: what's the benefit of using final variables here? ## clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DeleteGroupsRequestTest { + +protected static String groupId1 = "group-id-1"; +protected static String groupId2 = "group-id-2"; Review Comment: we can move these into the test as well ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup( group.remove(member.memberId()); } +/** + * Handles a DeleteGroups request. + * Populates the record list passed in with record to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by + * calling {@link GroupMetadataManager#validateDeleteGroup(String)}. + * + * @param groupId The ID of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}. Review Comment: nit: can we change all usages of "ID" to "id"? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@
Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]
xinyuliu-cb commented on PR #13905: URL: https://github.com/apache/kafka/pull/13905#issuecomment-1743917984 same here. catching up from the earliest offsets is too expensive and resource demanding to MM2 and Kafka brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15028) AddPartitionsToTxnManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771294#comment-17771294 ] Justine Olshan commented on KAFKA-15028: [~divijvaidya] I saw your comment on the RC thread about missing documentation for this. Where do we typically put metrics documentation? > AddPartitionsToTxnManager metrics > - > > Key: KAFKA-15028 > URL: https://issues.apache.org/jira/browse/KAFKA-15028 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.6.0 > > Attachments: latency-cpu.html > > > KIP-890 added metrics for the AddPartitionsToTxnManager > VerificationTimeMs – number of milliseconds from adding partition info to the > manager to the time the response is sent. This will include the round trip to > the transaction coordinator if it is called. This will also account for > verifications that fail before the coordinator is called. > VerificationFailureRate – rate of verifications that returned in failure > either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov opened a new pull request, #14471: URL: https://github.com/apache/kafka/pull/14471 WIP PR. This PR contains changes to rewrite `ConsumerGroupCommand` in java and transfer it to `tools` module ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1743782703 @mimaison @ijuma It seems we reduced changes as much as possible. So it's time to do final review of command transfer from scala to java. Can you, please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
ahuang98 commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1343169152 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } +this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { Review Comment: I was more suggesting that this method might benefit from a comment which describes behavior. I guess the info log explains it well enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #14456: URL: https://github.com/apache/kafka/pull/14456#issuecomment-1743719944 @jolshan Thank you very much for the review and merge. Appreciate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15528) KIP-986: Cross-Cluster Replication
Greg Harris created KAFKA-15528: --- Summary: KIP-986: Cross-Cluster Replication Key: KAFKA-15528 URL: https://issues.apache.org/jira/browse/KAFKA-15528 Project: Kafka Issue Type: New Feature Reporter: Greg Harris https://cwiki.apache.org/confluence/display/KAFKA/KIP-986%3A+Cross-Cluster+Replication -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]
jolshan merged PR #14456: URL: https://github.com/apache/kafka/pull/14456 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]
jolshan commented on PR #14456: URL: https://github.com/apache/kafka/pull/14456#issuecomment-1743685795 Hey sorry. I was off for the weekend. I can take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1343033045 ## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ## @@ -210,6 +238,12 @@ public String toString() { b.append(logStartOffset); b.append(", recordErrors: "); b.append(recordErrors); +b.append(", currentLeader: "); +if (currentLeader != null) { Review Comment: looking at the java docs I think you're right, this could be replaced by `b.append(currentLeader)`. I'm not sure why `errorMessage` was written this way, it looks like it was changed explicitly in [this commit](https://github.com/apache/kafka/commit/f41a5c2c8632bfd0dc50321c1c69418db04f42f6#diff-82aef2b279f7d0093b5e7bbd34cbf9abfa6bb5ed454c72419c03dbe2e58e0eab) but I don't see a reason for it, I could probably change this as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2
[ https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15527: Labels: kip (was: ) > Add reverseRange and reverseAll query over kv-store in IQv2 > --- > > Key: KAFKA-15527 > URL: https://issues.apache.org/jira/browse/KAFKA-15527 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > Labels: kip > > Add reverseRange and reverseAll query over kv-store in IQv2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2
[ https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15527: Component/s: streams > Add reverseRange and reverseAll query over kv-store in IQv2 > --- > > Key: KAFKA-15527 > URL: https://issues.apache.org/jira/browse/KAFKA-15527 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > Add reverseRange and reverseAll query over kv-store in IQv2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13005) Support JBOD in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771237#comment-17771237 ] Proven Provenzano commented on KAFKA-13005: --- Can this story be closed as duplicate as a new set of tasks for KRaft JBOD support is outlined in [KAFKA-14127|https://issues.apache.org/jira/browse/KAFKA-14127] > Support JBOD in kraft mode > -- > > Key: KAFKA-13005 > URL: https://issues.apache.org/jira/browse/KAFKA-13005 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Deng Ziming >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15449) Verify transactional offset commits (KIP-890 part 1)
[ https://issues.apache.org/jira/browse/KAFKA-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-15449. Resolution: Fixed > Verify transactional offset commits (KIP-890 part 1) > > > Key: KAFKA-15449 > URL: https://issues.apache.org/jira/browse/KAFKA-15449 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Critical > > We verify on produce requests but not offset commits. We should fix this to > avoid hanging transactions on consumer offset partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15449: Verify transactional offset commits (KIP-890 part 1) [kafka]
jolshan merged PR #14370: URL: https://github.com/apache/kafka/pull/14370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342935092 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1141,13 +1134,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, // Also indicate whether we have the accurate first offset or not if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) - firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset)) + firstOffset = batch.baseOffset lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage = true } // check that offsets are monotonically increasing - if (lastOffset >= batch.lastOffset) + if (requireOffsetsMonotonic && lastOffset >= batch.lastOffset) Review Comment: We technically don't need this since the check also has `if (requireOffsetsMonotonic && !monotonic)` ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig, ) } -val errorResults = errorsPerPartition.map { - case (topicPartition, error) => -topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) +val allResults = localProduceResults ++ errorResults +val produceStatus = allResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( +result.info.lastOffset + 1, // required offset +new PartitionResponse( + result.error, + result.info.firstOffset, + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + result.exception.map(_.getMessage).orNull Review Comment: Are we going to start returning the error message for other exceptions now too? I think we will add the default exception message when previously we were not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342933163 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset))) + appendInfo.setFirstOffset(duplicate.firstOffset) appendInfo.setLastOffset(duplicate.lastOffset) appendInfo.setLogAppendTime(duplicate.timestamp) appendInfo.setLogStartOffset(logStartOffset) case None => - // Before appending update the first offset metadata to include segment information - appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata => Review Comment: We removed this since the default is zero, and in that case we take the segment base offset anyway? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]
msn-tldr commented on PR #14384: URL: https://github.com/apache/kafka/pull/14384#issuecomment-1743383185 @kirktrue > @jsancio - can you weigh in about changes to org.apache.kafka.common needing a KIP? I have removed the changes to public classes, so this shouldn't require a KIP, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]
msn-tldr commented on code in PR #14384: URL: https://github.com/apache/kafka/pull/14384#discussion_r1340370371 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -156,6 +157,8 @@ public class SenderTest { private SenderMetricsRegistry senderMetricsRegistry = null; private final LogContext logContext = new LogContext(); +private final Logger log = logContext.logger(SenderTest.class); Review Comment: Inclined to keep it since helps improve readability of test-logs when test fails. By default logging is off, see similar comment below for details. ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -96,6 +98,8 @@ public class RecordAccumulatorTest { private final long maxBlockTimeMs = 1000; private final LogContext logContext = new LogContext(); +private final Logger log = logContext.logger(RecordAccumulatorTest.class); Review Comment: I am inclined to keep it, as it helps improve readability of test logs if/when test fails. By default, the logging is turned off in `clients/src/resources/log4.properties` ## clients/src/main/java/org/apache/kafka/common/PartitionInfo.java: ## @@ -20,6 +20,7 @@ * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { +public static final int UNKNOWN_LEADER_EPOCH = -1; Review Comment: @dajac this is removed now, thanks! ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); +this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH; +this.leaderChangedAttempts = -1; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } +/* + * Returns whether the leader epoch has changed since the last attempt. + * @param latestLeaderEpoch The latest leader epoch. + * @return true if the leader has changed, otherwise false. + */ +boolean hasLeaderChanged(int latestLeaderEpoch) { +boolean leaderChanged = false; +// Checking for leader change makes sense only from 1st retry onwards(attempt >=1). +log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}", +this, currentLeaderEpoch, leaderChangedAttempts, latestLeaderEpoch, attempts()); +if (attempts() >= 1) { +// If the leader's epoch has changed, this counts as a leader change +if (currentLeaderEpoch != latestLeaderEpoch) { +leaderChangedAttempts = attempts(); +leaderChanged = true; +} else { +// Otherwise, it's only a leader change until the first attempt is made with this leader Review Comment: > least one attempt implies a retry. > the above epoch comparison is false So leader-epoch is still the same. Consider leader was changed at attempt=5, to epoch=100. maybeUpdateLeaderEpoch() should detect leader change even when called again at attempt=5, with the same epoch=100. As this is the same attempt in which the leader change was detected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2
Hanyu Zheng created KAFKA-15527: --- Summary: Add reverseRange and reverseAll query over kv-store in IQv2 Key: KAFKA-15527 URL: https://issues.apache.org/jira/browse/KAFKA-15527 Project: Kafka Issue Type: Improvement Reporter: Hanyu Zheng Add reverseRange and reverseAll query over kv-store in IQv2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2
[ https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng reassigned KAFKA-15527: --- Assignee: Hanyu Zheng > Add reverseRange and reverseAll query over kv-store in IQv2 > --- > > Key: KAFKA-15527 > URL: https://issues.apache.org/jira/browse/KAFKA-15527 > Project: Kafka > Issue Type: Improvement >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > Add reverseRange and reverseAll query over kv-store in IQv2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342920121 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] MINOR: suppress dependencycheck warning for CVE-2023-35116 [kafka]
jlprat merged PR #14460: URL: https://github.com/apache/kafka/pull/14460 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on PR #14364: URL: https://github.com/apache/kafka/pull/14364#issuecomment-1743351780 Hello @dajac - Thanks for the review. I hope I've addressed most of your concerns in the recent reviews. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: suppress dependencycheck warning for CVE-2023-35116 [kafka]
jlprat commented on PR #14460: URL: https://github.com/apache/kafka/pull/14460#issuecomment-1743351726 Test failures are all unrelated to the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342915079 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342910020 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join or rejoin a group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be + * sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; Review Comment: Added some comments directly above the var - Is this enough or you actually want this to be presented in the "javaDoc" of the class section? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342909611 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -96,10 +96,10 @@ final class KafkaMetadataLog private ( } private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = { -if (appendInfo.firstOffset.isPresent()) - new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, appendInfo.lastOffset) +if (appendInfo.firstOffset != UnifiedLog.UnknownOffset) + new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset) else - throw new KafkaException(s"Append failed unexpectedly: ${appendInfo.errorMessage}") Review Comment: `errorMessage` would have never been populated in the calling context of `handleAndConvertLogAppendInfo`, so it is just dropped from the log in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15526) Simplify LogAppendInfo parameters
David Mao created KAFKA-15526: - Summary: Simplify LogAppendInfo parameters Key: KAFKA-15526 URL: https://issues.apache.org/jira/browse/KAFKA-15526 Project: Kafka Issue Type: Improvement Reporter: David Mao Assignee: David Mao Currently LogAppendInfo is quite overloaded, carrying a bunch of redundant information. This makes some of the code unnecessarily complex in the log layer, since the log layer is unsure which fields are required to populate for higher layers, and higher layers are unsure which fields are required to bubble back to clients. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342899010 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
[jira] [Commented] (KAFKA-15525) Segment uploads stop working following a broker failure
[ https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771171#comment-17771171 ] Francois Visconte commented on KAFKA-15525: --- I was planning to do that change anyways but the strange thing is that it doesn't happen consistently when I'm shutting down brokers in my cluster. > Segment uploads stop working following a broker failure > --- > > Key: KAFKA-15525 > URL: https://issues.apache.org/jira/browse/KAFKA-15525 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Francois Visconte >Priority: Major > > I have a tiered-storage enabled cluster and topic where I continuously > produce and consume to/from a TS-enabled topic on that cluster. > Here are the topic settings I’m using: > {code:java} > local.retention.ms=12 > remote.storage.enable=true > retention.ms: 1080 > segment.bytes: 51200 > {code} > Here are my broker settings: > {code:java} > remote.log.storage.system.enable=true > remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/* > remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager > remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager > remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT > remote.log.manager.task.interval.ms=5000 > remote.log.manager.thread.pool.size=10 > remote.log.reader.threads=10 > remote.log.reader.max.pending.tasks=100 > rlmm.config.remote.log.metadata.topic.replication.factor=1 > rlmm.config.remote.log.metadata.topic.num.partitions=50 > rlmm.config.remote.log.metadata.topic.retention.ms=-1 > rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache > rsm.config.chunk.cache.path=/data/tiered-storage-cache > rsm.config.chunk.cache.size=1073741824 > rsm.config.metrics.recording.level=DEBUG > rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider > rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.s3.region=us-east-1 > rsm.config.chunk.size=102400 > rsm.config.storage.s3.multipart.upload.part.size=16777216 {code} > When a broker in the cluster get rotated (replaced or restarted) some brokers > start throwing this error repeatedly: > {code:java} > [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error > occurred while copying log segments of partition: > yTypIvtBRY2l3sD4-8M7fA:loadgen-3 > java.util.concurrent.ExecutionException: > org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687) > at > kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188) > at > java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) > at > java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > at > java.b
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) Review Comment: I don't think so, getPartitionInfo returns an Option, the equivalent of null would be an empty option. We don't seem to null check this value elsewhere either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]
yangy commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1342851207 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -333,6 +348,87 @@ public CoordinatorResult commitOffset( return new CoordinatorResult<>(records, response); } +/** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteOffsets( +OffsetDeleteRequestData request +) throws ApiException { +final Group group = validateOffsetDelete(request); +final List records = new ArrayList<>(); +final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); +final TimelineHashMap> offsetsByTopic = +offsetsByGroup.get(request.groupId()); + +request.topics().forEach(topic -> { +final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +final TimelineHashMap offsetsByPartition = offsetsByTopic == null ? +null : offsetsByTopic.get(topic.name()); + +if (group.isSubscribedToTopic(topic.name())) { +topic.partitions().forEach(partition -> +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) +) +); +} else { +topic.partitions().forEach(partition -> { +if (offsetsByPartition != null && offsetsByPartition.containsKey(partition.partitionIndex())) { +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( +request.groupId(), +topic.name(), +partition.partitionIndex() +)); +} +}); +} + +final OffsetDeleteResponseData.OffsetDeleteResponseTopic responseTopic = +new OffsetDeleteResponseData.OffsetDeleteResponseTopic() +.setName(topic.name()) +.setPartitions(responsePartitionCollection); +responseTopicCollection.add(responseTopic); +}); +response.setTopics(responseTopicCollection); + +return new CoordinatorResult<>(records, response); +} + +/** + * Deletes offsets as part of a DeleteGroups request. + * Populates the record list passed in with records to update the state machine. + * Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} + * + * @param groupId The ID of the given group. + * @param records The record list to populate. + */ +public void deleteAllOffsets( +String groupId, +List records +) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); + +if (offsetsByTopic != null) { Review Comment: Any chance deleteAllOffsets will get invoked before the group is completely removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1342850556 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) +partitionInfo.foreach { info => Review Comment: Looking at other uses of partitionInfo I think this is a style choice. There can only be 1 partitionInfo in the getPartitionInfo object, so the forEach should only ever access 1 entry, I think this is just a more succinct way of accessing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
chb2ab commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) Review Comment: I don't think so, getPartitionInfo returns an Option, the equivalent of null would be an empty option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14504: Implement DescribeGroups API [kafka]
dajac commented on code in PR #14462: URL: https://github.com/apache/kafka/pull/14462#discussion_r1342814124 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -444,6 +445,66 @@ public List listGroups(List statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } +/** + * Handles a DescribeGroup request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the DescribeGroupsResponseData.DescribedGroup. + */ +public List describeGroups( +List groupIds, +long committedOffset +) { +final List describedGroups = new ArrayList<>(); +groupIds.forEach(groupId -> { +try { +Group group = group(groupId, committedOffset); +if (group.type() != GENERIC) { +// We don't support upgrading/downgrading between protocols at the moment, so +// we throw an exception if a group exists with the wrong type. +throw new GroupIdNotFoundException(String.format("Group %s is not a generic group.", +groupId)); +} +GenericGroup genericGroup = (GenericGroup) group; Review Comment: When the group does not exist, in the current code, we return `GroupCoordinator.DeadGroup` instead of returning a `GroupIdNotFoundException` exception. Do you confirm? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -751,6 +752,112 @@ public void testListGroupsFailedImmediately() assertEquals(Collections.emptyList(), listGroupsResponseData.groups()); } +@Test +public void testDescribeGroups() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +int partitionCount = 2; +service.startup(() -> partitionCount); + +DescribeGroupsResponseData.DescribedGroup describedGroup1 = new DescribeGroupsResponseData.DescribedGroup() +.setGroupId("group-id-1"); +DescribeGroupsResponseData.DescribedGroup describedGroup2 = new DescribeGroupsResponseData.DescribedGroup() +.setGroupId("group-id-2"); +List expectedDescribedGroups = Arrays.asList( +describedGroup1, +describedGroup2 +); + +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("describe-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); + +CompletableFuture describedGroupFuture = new CompletableFuture<>(); +when(runtime.scheduleReadOperation( +ArgumentMatchers.eq("describe-groups"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), +ArgumentMatchers.any() +)).thenReturn(describedGroupFuture); + +CompletableFuture> future = +service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("group-id-1", "group-id-2")); + +assertFalse(future.isDone()); + describedGroupFuture.complete(Collections.singletonList(describedGroup2)); + +assertTrue(future.get().containsAll(expectedDescribedGroups)); +assertTrue(expectedDescribedGroups.containsAll(future.get())); Review Comment: nit: Could we use assertEquals here? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -444,6 +445,66 @@ public List listGroups(List statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } +/** + * Handles a DescribeGroup request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the DescribeGroupsResponseData.DescribedGroup. + */ +public List describeGroups( +List groupIds, +long committedOffset +) { +final List describedGroups = new ArrayList<>(); +groupIds.forEach(groupId -> { +try { +Group group = group(groupId, committedOffset); +if (group.type() != GENERIC) { +// We don't support upgrading/downgrading between protocols at the moment, so +// we throw an exception if a group exists
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342785385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342783380 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342781938 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342778930 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Log
Re: [PR] KAFKA-15326: [9/N] Start and stop executors and cornercases [kafka]
lucasbru merged PR #14281: URL: https://github.com/apache/kafka/pull/14281 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15326: [9/N] Start and stop executors and cornercases [kafka]
lucasbru commented on PR #14281: URL: https://github.com/apache/kafka/pull/14281#issuecomment-174303 Build failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
dajac commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342694336 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogCont
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
dajac commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1342667683 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a + * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to + * the network queue to be sent out. Once the response is received, the module will update the state in the + * {@link MembershipManager} and handle any errors. + * + * The manager will try to send a heartbeat when the member is in {@link MemberState#STABLE}, + * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which mean the member is either in a stable + * group, is trying to join a group, or is in the process of reconciling the assignment changes. + * + * If the member got kick out of a group, it will try to give up the current assignment by invoking {@code + * OnPartitionsLost} because reattempting to join again with a zero epoch. + * + * If the member does not have groupId configured or encountering fatal exceptions, a heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and try to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be + * backoff exponentially. + * + * When the member completes the assignment reconciliation, the {@link HeartbeatRequestState} will be reset so + * that a heartbeat will be sent in the next event loop. + * + * See {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogCont
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1342676499 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } +this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { +fetchTimer.update(currentTimeMs); +boolean isExpired = fetchTimer.isExpired(); +if (isExpired) { +log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", +fetchTimeoutMs, fetchedVoters); +} +return isExpired; +} + +public void maybeResetMajorityFollowerFetchTimeout(int id, long currentTimeMs) { +updateFetchedVoters(id); +if (fetchedVoters.size() >= majority) { +fetchedVoters.clear(); +fetchTimer.update(currentTimeMs); +fetchTimer.reset(fetchTimeoutMs); +} +} + +private void updateFetchedVoters(int id) { +if (isVoter(id)) { +fetchedVoters.add(id); +} Review Comment: > Note that ReplicaState already contains the lastFetchTimestamp. I'm trying to re-use the `lastFetchTimestamp` in ReplicaState today, but found it won't work as expected since the default value for it is `-1`, which means, when a note becomes a leader, all the `lastFetchTimestamp` of follower nodes are `-1`. Using current `timer` way is more readable IMO. > The part that is not clear to me is when or how to wake up the leader for a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' last fetch time is taken into account when blocking on the messageQueue.poll Good question. My thought is, we add some buffer to tolerate the operation time. Like when [checking shrinkISR](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L373-L375), we give a 1.5x of the timeout to make things easier, instead of calculating the accurate timestamp. So, I'm thinking we use `fetchTimeout * 1.5`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1342668812 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } +this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { Review Comment: Changed to `hasMajorityFollowerFetchExpired`. Let me know if you have any better suggestion. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1342669657 ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio context.listener.currentLeaderAndEpoch()); } +@Test +public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws Exception { Review Comment: Will add tests later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1342665261 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -965,6 +965,10 @@ private CompletableFuture handleFetchRequest( } int replicaId = FetchRequest.replicaId(request); + +Optional> state = quorum.maybeLeaderState(); +state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimeout(replicaId, currentTimeMs)); Review Comment: Agree! Now, we update the timer when we update the lastFetchTimestamp in replicaState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]
dajac commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1342599402 ## clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DeleteGroupsRequestData; +import org.apache.kafka.common.message.DeleteGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DeleteGroupsRequestTest { + +protected static String groupId1 = "group-id-1"; +protected static String groupId2 = "group-id-2"; + +private static DeleteGroupsRequestData data; + +@BeforeEach +public void setUp() { +data = new DeleteGroupsRequestData() +.setGroupsNames(Arrays.asList(groupId1, groupId2)); +} Review Comment: nit: Given that there is only one test. I would rather move everything into that test. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -307,6 +344,57 @@ private void replay( lastWrittenOffset++; } + +public void testOffsetDeleteWith( +OffsetMetadataManagerTestContext context, +String groupId, +String topic, +int partition, +Errors error Review Comment: nit: expectedError? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -333,6 +348,87 @@ public CoordinatorResult commitOffset( return new CoordinatorResult<>(records, response); } +/** + * Handles an OffsetDelete request. + * + * @param request The OffsetDelete request. + * + * @return A Result containing the OffsetDeleteResponseData response and + * a list of records to update the state machine. + */ +public CoordinatorResult deleteOffsets( +OffsetDeleteRequestData request +) throws ApiException { +final Group group = validateOffsetDelete(request); +final List records = new ArrayList<>(); +final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +final OffsetDeleteResponseData response = new OffsetDeleteResponseData(); +final TimelineHashMap> offsetsByTopic = +offsetsByGroup.get(request.groupId()); + +request.topics().forEach(topic -> { +final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +final TimelineHashMap offsetsByPartition = offsetsByTopic == null ? +null : offsetsByTopic.get(topic.name()); + +if (group.isSubscribedToTopic(topic.name())) { +topic.partitions().forEach(partition -> +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code()) +) +); +} else { +topic.partitions().forEach(partition -> { +if (offsetsByPartition != null && offsetsByPartition.containsKey(partition.partitionIndex())) { +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +); + records.add(RecordHelpers.newOffsetCommitTombstoneRecord( +request.groupId(), +topic
[jira] [Commented] (KAFKA-15525) Segment uploads stop working following a broker failure
[ https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771065#comment-17771065 ] Kamal Chandraprakash commented on KAFKA-15525: -- While uploading the segment, the RemoteLogManager sends an event to the internal topic, if it's unavailable then it cannot upload the segment. {{rlmm.config.remote.log.metadata.topic.replication.factor}} is set to 1, can you try increasing the replication-factor to 3 (or) 4? > Segment uploads stop working following a broker failure > --- > > Key: KAFKA-15525 > URL: https://issues.apache.org/jira/browse/KAFKA-15525 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Francois Visconte >Priority: Major > > I have a tiered-storage enabled cluster and topic where I continuously > produce and consume to/from a TS-enabled topic on that cluster. > Here are the topic settings I’m using: > {code:java} > local.retention.ms=12 > remote.storage.enable=true > retention.ms: 1080 > segment.bytes: 51200 > {code} > Here are my broker settings: > {code:java} > remote.log.storage.system.enable=true > remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/* > remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager > remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager > remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT > remote.log.manager.task.interval.ms=5000 > remote.log.manager.thread.pool.size=10 > remote.log.reader.threads=10 > remote.log.reader.max.pending.tasks=100 > rlmm.config.remote.log.metadata.topic.replication.factor=1 > rlmm.config.remote.log.metadata.topic.num.partitions=50 > rlmm.config.remote.log.metadata.topic.retention.ms=-1 > rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache > rsm.config.chunk.cache.path=/data/tiered-storage-cache > rsm.config.chunk.cache.size=1073741824 > rsm.config.metrics.recording.level=DEBUG > rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider > rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage > rsm.config.storage.s3.region=us-east-1 > rsm.config.chunk.size=102400 > rsm.config.storage.s3.multipart.upload.part.size=16777216 {code} > When a broker in the cluster get rotated (replaced or restarted) some brokers > start throwing this error repeatedly: > {code:java} > [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error > occurred while copying log segments of partition: > yTypIvtBRY2l3sD4-8M7fA:loadgen-3 > java.util.concurrent.ExecutionException: > org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728) > at > kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687) > at > kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:833) > Caused by: org.apache.kafka.common.KafkaException: > java.util.concurrent.TimeoutException: Timed out in catching up with the > expected offset by consumer. > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188) > at > java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) > at > java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLe
Re: [PR] MINOR: Correcting Javadocs for throwIfMemberEpochIsInvalid [kafka]
dajac merged PR #14468: URL: https://github.com/apache/kafka/pull/14468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]
dajac merged PR #14469: URL: https://github.com/apache/kafka/pull/14469 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]
fvaleri commented on PR #13562: URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742881176 > @fvaleri I think it's time to take a note about this changes in [KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it? This should fall under the phrase: "4. We should also get rid of many deprecated options across all tools, including not migrated tools." If you want to provide a list of all deprecated options across all tools, that would be great, and I will be happy to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15498) Upgrade Snappy-Java to 1.1.10.4
[ https://issues.apache.org/jira/browse/KAFKA-15498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771051#comment-17771051 ] Luke Chen commented on KAFKA-15498: --- Note: In v3.7.0, Snappy-Java is upgraded to 1.1.10.5 in this PR: https://github.com/apache/kafka/pull/14458 > Upgrade Snappy-Java to 1.1.10.4 > --- > > Key: KAFKA-15498 > URL: https://issues.apache.org/jira/browse/KAFKA-15498 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1, 3.5.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.6.0 > > > Snappy-java published a new vulnerability > <[https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]> > that will cause OOM error in the server. > Kafka is also impacted by this vulnerability since it's like CVE-2023-34455 > <[https://nvd.nist.gov/vuln/detail/CVE-2023-34455]>. > We'd better bump the snappy-java version to bypass this vulnerability. > PR <[https://github.com/apache/kafka/pull/14434]> is created to run the CI > build. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15498: bump snappy-java version to 1.1.10.5 [kafka]
showuon merged PR #14458: URL: https://github.com/apache/kafka/pull/14458 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15498: bump snappy-java version to 1.1.10.5 [kafka]
showuon commented on PR #14458: URL: https://github.com/apache/kafka/pull/14458#issuecomment-1742858256 Agree it's not a blocker for v3.6.0. Failed tests are unrelated: ``` Build / JDK 21 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft Build / JDK 21 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft Build / JDK 21 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 11 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 17 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15525) Segment uploads stop working following a broker failure
Francois Visconte created KAFKA-15525: - Summary: Segment uploads stop working following a broker failure Key: KAFKA-15525 URL: https://issues.apache.org/jira/browse/KAFKA-15525 Project: Kafka Issue Type: Bug Components: Tiered-Storage Affects Versions: 3.6.0 Reporter: Francois Visconte I have a tiered-storage enabled cluster and topic where I continuously produce and consume to/from a TS-enabled topic on that cluster. Here are the topic settings I’m using: {code:java} local.retention.ms=12 remote.storage.enable=true retention.ms: 1080 segment.bytes: 51200 {code} Here are my broker settings: {code:java} remote.log.storage.system.enable=true remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/* remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT remote.log.manager.task.interval.ms=5000 remote.log.manager.thread.pool.size=10 remote.log.reader.threads=10 remote.log.reader.max.pending.tasks=100 rlmm.config.remote.log.metadata.topic.replication.factor=1 rlmm.config.remote.log.metadata.topic.num.partitions=50 rlmm.config.remote.log.metadata.topic.retention.ms=-1 rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache rsm.config.chunk.cache.path=/data/tiered-storage-cache rsm.config.chunk.cache.size=1073741824 rsm.config.metrics.recording.level=DEBUG rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage rsm.config.storage.s3.region=us-east-1 rsm.config.chunk.size=102400 rsm.config.storage.s3.multipart.upload.part.size=16777216 {code} When a broker in the cluster get rotated (replaced or restarted) some brokers start throwing this error repeatedly: {code:java} [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error occurred while copying log segments of partition: yTypIvtBRY2l3sD4-8M7fA:loadgen-3 java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728) at kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687) at kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer. at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188) at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: java.util.concurrent.TimeoutException: Timed out in catching up with the expected offset by consumer. at org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:121) at org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:89) at org.apache.kafka.server.
Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]
ruslankrivoshein commented on PR #13562: URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742817052 @fvaleri I think it's time to take a note about this changes in [KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Logging fix in StreamsPartitionAssignor [kafka]
lucasbru commented on PR #14435: URL: https://github.com/apache/kafka/pull/14435#issuecomment-1742776359 > @lucasbru you set up your apache account already, right? I'm leaving it to you to merge but just let me know if you need me to do so Yes, done and backported -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #14456: URL: https://github.com/apache/kafka/pull/14456#issuecomment-1742776092 CI is OK after code review changes. @jolshan do you have other comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Logging fix in StreamsPartitionAssignor [kafka]
lucasbru merged PR #14435: URL: https://github.com/apache/kafka/pull/14435 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14034 Idempotent producer should wait for preceding in-flight b… [kafka]
urbandan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1742583164 @viktorsomogyi I would like to, yes, but my understanding is (based on the comments of @jolshan) that another loosely related bug blocks this fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15511: Catch CorruptIndexException instead of CorruptRecordException [kafka]
showuon commented on PR #14459: URL: https://github.com/apache/kafka/pull/14459#issuecomment-1742572611 Thanks for the fix @iit2009060 ! Nice find! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #14468: MINOR: Correcting Javadocs for throwIfMemberEpochIsInvalid
vamossagar12 commented on PR #14468: URL: https://github.com/apache/kafka/pull/14468#issuecomment-1742509255 @dajac , sorry about that. It was a checkstyle failure. I have corrected it and ran locally. Checkstyles passed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org