[GitHub] [kafka] kamalcph commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
kamalcph commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295402786 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } +private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) { +return new RemoteLogSegmentMetadata( +new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), +new TopicPartition("topic", 0)), Uuid.randomUuid()), +startOffset, endOffset, +10L, +1, +10L, +1000, +Optional.empty(), +RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs); +} + +@Test +public void testRemoteSegmentWithinLeaderEpochs() { +// Test whether a remote segment is within the leader epochs +final long logEndOffset = 90L; + +TreeMap leaderEpochToStartOffset = new TreeMap() {{ +put(0, 0L); +put(1, 10L); +put(2, 20L); +put(3, 30L); +put(4, 40L); +put(5, 50L); +put(7, 70L); +}}; Review Comment: For clean code, it creates an anonymous extra class at every usage and we should try to avoid this pattern. https://www.baeldung.com/java-initialize-hashmap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
kamalcph commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295394486 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +625,230 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { Review Comment: Filed KAFKA-15351 and KAFKA-15352 to track the cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15352) Ensure consistency while deleting the remote log segments
[ https://issues.apache.org/jira/browse/KAFKA-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15352: - Parent: KAFKA-7739 Issue Type: Sub-task (was: Task) > Ensure consistency while deleting the remote log segments > - > > Key: KAFKA-15352 > URL: https://issues.apache.org/jira/browse/KAFKA-15352 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Priority: Major > > In Kafka-14888, the remote log segments are deleted which breaches the > retention time/size before updating the log-start-offset. In middle of > deletion, if the consumer starts to read from the beginning of the topic, > then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be > thrown back to the consumer. > To ensure consistency, similar to local log segments where the actual > segments are deleted after {{segment.delete.delay.ms}}, we should update the > log-start-offset first before deleting the remote log segment. > See the [PR#13561|https://github.com/apache/kafka/pull/13561] and > [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] > for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15352) Ensure consistency while deleting the remote log segments
Kamal Chandraprakash created KAFKA-15352: Summary: Ensure consistency while deleting the remote log segments Key: KAFKA-15352 URL: https://issues.apache.org/jira/browse/KAFKA-15352 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash In Kafka-14888, the remote log segments are deleted which breaches the retention time/size before updating the log-start-offset. In middle of deletion, if the consumer starts to read from the beginning of the topic, then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back to the consumer. To ensure consistency, similar to local log segments where the actual segments are deleted after {{segment.delete.delay.ms}}, we should update the log-start-offset first before deleting the remote log segment. See the [PR#13561|https://github.com/apache/kafka/pull/13561] and [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage
[ https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15351: - Description: Case-1: In the FETCH response, the leader-log-start-offset will be piggy-backed. But, there can be a scenario: # Leader deleted the remote log segment and updates it's log-start-offset # Before the replica-2 update it's log-start-offset via FETCH-request, the leadership changed to replica-2. # There are no more eligible segments to delete from remote. # The log-start-offset will be stale (referring to old log-start-offset but the data was already removed from remote) # If the consumer starts to read from the beginning of the topic, it will fail to read. Case-2: The old-leader (follower) can delete the remote log segment in middle of leader election. We need to update the log-start-offset metadata for this case. See this comment [https://github.com/apache/kafka/pull/13561#discussion_r1226538752] for more details. was: In the FETCH response, the leader-log-start-offset will be piggy-backed. But, there can be a scenario: # Leader deleted the remote log segment and updates it's log-start-offset # Before the replica-2 update it's log-start-offset via FETCH-request, the leadership changed to replica-2. # There are no more eligible segments to delete from remote. # The log-start-offset will be stale (referring to old log-start-offset but the data was already removed from remote) # If the consumer starts to read from the beginning of the topic, it will fail to read. > Update log-start-offset after leader election for topics enabled with remote > storage > > > Key: KAFKA-15351 > URL: https://issues.apache.org/jira/browse/KAFKA-15351 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Priority: Major > > Case-1: > In the FETCH response, the leader-log-start-offset will be piggy-backed. But, > there can be a scenario: > # Leader deleted the remote log segment and updates it's log-start-offset > # Before the replica-2 update it's log-start-offset via FETCH-request, the > leadership changed to replica-2. > # There are no more eligible segments to delete from remote. > # The log-start-offset will be stale (referring to old log-start-offset but > the data was already removed from remote) > # If the consumer starts to read from the beginning of the topic, it will > fail to read. > > Case-2: > The old-leader (follower) can delete the remote log segment in middle of > leader election. We need to update the log-start-offset metadata for this > case. > See this comment > [https://github.com/apache/kafka/pull/13561#discussion_r1226538752] for more > details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage
Kamal Chandraprakash created KAFKA-15351: Summary: Update log-start-offset after leader election for topics enabled with remote storage Key: KAFKA-15351 URL: https://issues.apache.org/jira/browse/KAFKA-15351 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash In the FETCH response, the leader-log-start-offset will be piggy-backed. But, there can be a scenario: # Leader deleted the remote log segment and updates it's log-start-offset # Before the replica-2 update it's log-start-offset via FETCH-request, the leadership changed to replica-2. # There are no more eligible segments to delete from remote. # The log-start-offset will be stale (referring to old log-start-offset but the data was already removed from remote) # If the consumer starts to read from the beginning of the topic, it will fail to read. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage
[ https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15351: - Parent: KAFKA-7739 Issue Type: Sub-task (was: Task) > Update log-start-offset after leader election for topics enabled with remote > storage > > > Key: KAFKA-15351 > URL: https://issues.apache.org/jira/browse/KAFKA-15351 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Priority: Major > > In the FETCH response, the leader-log-start-offset will be piggy-backed. But, > there can be a scenario: > # Leader deleted the remote log segment and updates it's log-start-offset > # Before the replica-2 update it's log-start-offset via FETCH-request, the > leadership changed to replica-2. > # There are no more eligible segments to delete from remote. > # The log-start-offset will be stale (referring to old log-start-offset but > the data was already removed from remote) > # If the consumer starts to read from the beginning of the topic, it will > fail to read. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1295387916 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1033,6 +1360,35 @@ public void close() { } } +private static class RetentionSizeData { +private final long retentionSize; +private final long remainingBreachedSize; + +public RetentionSizeData(long retentionSize, long remainingBreachedSize) { +if (retentionSize < remainingBreachedSize) { +throw new IllegalArgumentException("retentionSize must be greater than remainingBreachedSize"); +} Review Comment: Good catch! It was changed while refactoring, added UTs to cover that in the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15346: Component/s: streams > Single-Key_single-timestamp IQs with versioned state stores > --- > > Key: KAFKA-15346 > URL: https://issues.apache.org/jira/browse/KAFKA-15346 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers just two query types: > *Key Queries with single timestamp:* > # single-key latest-value lookup > # single-key lookup with timestamp (upper) bound -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15347: Component/s: streams > Single-Key_multi-timestamp IQs with versioned state stores > -- > > Key: KAFKA-15347 > URL: https://issues.apache.org/jira/browse/KAFKA-15347 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers just four query types: > *Key Queries with multiple timestamps:* > # single-key query with upper bound timestamp > # single-key query with lower bound timestamp > # single-key query with timestamp range > # single-key all versions query -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15348) Range IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15348: Component/s: streams > Range IQs with versioned state stores > - > > Key: KAFKA-15348 > URL: https://issues.apache.org/jira/browse/KAFKA-15348 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers all types of range queries: > *Range Queries* > # key-range latest-value query > # key-range with lower bound latest-value query > # key-range with upper bound latest-value query > # all-keys (no bound) latest-value query > # key-range query with timestamp (upper) bound > # key-range with lower bound with timestamp (upper) bound > # key-range with upper bound with timestamp (upper) bound > # all-keys (no bound) with timestamp (upper) bound > # key-range query with timestamp range > # key-range query with lower bound with timestamp range > # key-range query with upper bound with timestamp range > # all-keys (no bound) with timestamp range > # key-range query all-versions > # key-range query with lower bound all-versions > # key-range query with upper bond all-versions > # all-keys query (no bound) all-versions (entire store) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295339433 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel, } } + def handleCreateDelegationTokenRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { +val alterRequest = request.body[CreateDelegationTokenRequest] + +val requester = request.context.principal +val ownerPrincipalName = alterRequest.data.ownerPrincipalName +val ownerPrincipalType = alterRequest.data.ownerPrincipalType +val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) { + request.context.principal +} else { + new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName) +} + +// Requester is always allowed to create token for self +if (!owner.equals(requester) && Review Comment: I added the code. Please look over the added code. One change is that I had to allow PLAINTEXT because all our broker to controller forwarding tests use PLAINTEXT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15329) Make default `remote.log.metadata.manager.class.name` as topic based RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15329. --- Resolution: Fixed > Make default `remote.log.metadata.manager.class.name` as topic based RLMM > - > > Key: KAFKA-15329 > URL: https://issues.apache.org/jira/browse/KAFKA-15329 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.6.0 > > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1] > , we should set default "remote.log.metadata.manager.class.name" as topic > based RLMM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #14202: KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM
showuon merged PR #14202: URL: https://github.com/apache/kafka/pull/14202 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14202: KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM
showuon commented on PR #14202: URL: https://github.com/apache/kafka/pull/14202#issuecomment-1679845654 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 17 and Scala 2.13 / kafka.api.SslAdminIntegrationTest.testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads() Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 20 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 20 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault() Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextAdminIntegrationTest.testElectPreferredLeaders(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on PR #14083: URL: https://github.com/apache/kafka/pull/14083#issuecomment-1679832173 > I think this starts to look good. There are some parts I haven't reviewed yet, I'll try to get them in the next few days. Thank you for the review comments. I think I have addressed all the ones you have given. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295289182 ## core/src/main/scala/kafka/server/DelegationTokenManager.scala: ## @@ -186,57 +134,28 @@ class DelegationTokenManager(val config: KafkaConfig, val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs val tokenRemoverScanInterval: Long = config.delegationTokenExpiryCheckIntervalMs - private val lock = new Object() - private var tokenChangeListener: ZkNodeChangeNotificationListener = _ def startup(): Unit = { if (config.tokenAuthEnabled) { - zkClient.createDelegationTokenPaths() loadCache() - tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, DelegationTokenChangeNotificationZNode.path, DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, TokenChangedNotificationHandler) - tokenChangeListener.init() } } def shutdown(): Unit = { if (config.tokenAuthEnabled) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295288501 ## core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala: ## @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.server.DelegationTokenManager +import kafka.server.KafkaConfig +import kafka.utils.Logging +import org.apache.kafka.image.loader.LoaderManifest +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DelegationTokenPublisher( + conf: KafkaConfig, + faultHandler: FaultHandler, + nodeType: String, + tokenManager: DelegationTokenManager, +) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { + logIdent = s"[${name()}] " + + var _firstPublish = true + + override def name(): String = s"DelegationTokenPublisher ${nodeType} id=${conf.nodeId}" + + override def onMetadataUpdate( +delta: MetadataDelta, +newImage: MetadataImage, +manifest: LoaderManifest + ): Unit = { +onMetadataUpdate(delta, newImage) + } + + def onMetadataUpdate( +delta: MetadataDelta, +newImage: MetadataImage, + ): Unit = { +val deltaName = if (_firstPublish) { + s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" +} else { + s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" +} +try { + if (_firstPublish) { +// Initialize the tokenCache with the Image +Option(newImage.delegationTokens()).foreach { delegationTokenImage => + delegationTokenImage.tokens().forEach { (tokenId, delegationTokenData) => + tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation())) + } +} +_firstPublish = false + } + // Apply changes to DelegationTokens. + Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta => +delegationTokenDelta.changes().forEach { + case (tokenId, delegationTokenData) => Review Comment: > nit: `tokenId` isn't used, you can just replace it with `_` It's used in the else clause when we remove the token. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on PR #14083: URL: https://github.com/apache/kafka/pull/14083#issuecomment-1679827453 > @pprovenzano Thanks for the PR. left few comments > > can we also update delegation token docs if required (like any configs for controller nodes etc..) https://github.com/apache/kafka/blob/trunk/docs/security.html#L1178 I'm going to do that with a separate PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2
gharris1727 commented on code in PR #14220: URL: https://github.com/apache/kafka/pull/14220#discussion_r1295265105 ## docs/upgrade.html: ## @@ -43,6 +43,10 @@ Notable changes in 3 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925 and Kafka Streams Developer Guide for more details. +To account for a break in compatibility introduced in version 3.1.0, MirrorMaker 2 has added a new +replication.policy.internal.topic.separator.enabled +property. If upgrading from 3.0.x or earlier, it may be necessary to set this property to false; see the property's +documentation for more details. Review Comment: It does look like the other items in this section include KIP links, but in general there aren't too many KIP links in the upgrade section. I think linking the KIP is unnecessary. ## docs/upgrade.html: ## @@ -43,6 +43,10 @@ Notable changes in 3 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925 and Kafka Streams Developer Guide for more details. +To account for a break in compatibility introduced in version 3.1.0, MirrorMaker 2 has added a new +replication.policy.internal.topic.separator.enabled +property. If upgrading from 3.0.x or earlier, it may be necessary to set this property to false; see the property's +documentation for more details. Review Comment: This second link feels redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2
C0urante commented on code in PR #14220: URL: https://github.com/apache/kafka/pull/14220#discussion_r1295254068 ## docs/upgrade.html: ## @@ -43,6 +43,10 @@ Notable changes in 3 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925 and Kafka Streams Developer Guide for more details. +To account for a break in compatibility introduced in version 3.1.0, MirrorMaker 2 has added a new +replication.policy.internal.topic.separator.enabled +property. If upgrading from 3.0.x or earlier, it may be necessary to set this property to false; see the property's +documentation for more details. Review Comment: We can also link to [KIP-949](https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy) here, but I'm hoping that the information provided here and in the property's docstring should be sufficient to save people the time and effort of reading through an entire KIP instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2
C0urante commented on PR #14220: URL: https://github.com/apache/kafka/pull/14220#issuecomment-1679781965 @gharris1727 @mimaison as the other two committers who voted for KIP-949, would you mind taking a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2
C0urante opened a new pull request, #14220: URL: https://github.com/apache/kafka/pull/14220 [Jira](https://issues.apache.org/jira/browse/KAFKA-15102) This is a docs-only follow-up for https://github.com/apache/kafka/pull/14082 that calls out the necessity for this property in our release notes so that users upgrading MM2 from older versions can do so safely. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility
C0urante merged PR #14082: URL: https://github.com/apache/kafka/pull/14082 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
mumrah commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295237042 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -66,6 +119,10 @@ public LogDeltaManifest( this.numBytes = numBytes; } +public static Builder newBuilder() { Review Comment: Well, it also has a public no-arg constructor, I just like the static factory. I do like the fluent style for builders and a static method to start it off makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230735 ## metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java: ## Review Comment: Yes and Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230635 ## metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java: ## Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests
[ https://issues.apache.org/jira/browse/KAFKA-15350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15350. - Resolution: Invalid I believe this was an environmental problem, due to some stale artifacts. A clean build fixed the issues I was seeing. > MetadataLoaderMetrics has ClassNotFoundException in system tests > > > Key: KAFKA-15350 > URL: https://issues.apache.org/jira/browse/KAFKA-15350 > Project: Kafka > Issue Type: Bug > Components: kraft, system tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Blocker > > The system tests appear to be failing on trunk with: > {noformat} > [2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.lang.NoClassDefFoundError: > org/apache/kafka/image/loader/MetadataLoaderMetrics >at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68) >at kafka.Kafka$.buildServer(Kafka.scala:83) >at kafka.Kafka$.main(Kafka.scala:91) >at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.image.loader.MetadataLoaderMetrics >at java.net.URLClassLoader.findClass(URLClassLoader.java:387) >at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) >at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >... 4 more{noformat} > This happens with the `tests/kafkatest/tests/connect/`, > `tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests
[ https://issues.apache.org/jira/browse/KAFKA-15350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15350: Issue Type: Bug (was: Improvement) > MetadataLoaderMetrics has ClassNotFoundException in system tests > > > Key: KAFKA-15350 > URL: https://issues.apache.org/jira/browse/KAFKA-15350 > Project: Kafka > Issue Type: Bug > Components: kraft, system tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Blocker > > The system tests appear to be failing on trunk with: > {noformat} > [2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.lang.NoClassDefFoundError: > org/apache/kafka/image/loader/MetadataLoaderMetrics >at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68) >at kafka.Kafka$.buildServer(Kafka.scala:83) >at kafka.Kafka$.main(Kafka.scala:91) >at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.image.loader.MetadataLoaderMetrics >at java.net.URLClassLoader.findClass(URLClassLoader.java:387) >at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) >at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >... 4 more{noformat} > This happens with the `tests/kafkatest/tests/connect/`, > `tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15336) Connect plugin Javadocs should mention serviceloader manifests
[ https://issues.apache.org/jira/browse/KAFKA-15336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15336: Issue Type: Improvement (was: Bug) > Connect plugin Javadocs should mention serviceloader manifests > -- > > Key: KAFKA-15336 > URL: https://issues.apache.org/jira/browse/KAFKA-15336 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.6.0 > > > Similar to the ConfigProvider, the Javadocs for the Connect plugin classes > should mention that plugin implementations should have ServiceLoader > manifests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15336) Connect plugin Javadocs should mention serviceloader manifests
[ https://issues.apache.org/jira/browse/KAFKA-15336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15336: Issue Type: Bug (was: Improvement) > Connect plugin Javadocs should mention serviceloader manifests > -- > > Key: KAFKA-15336 > URL: https://issues.apache.org/jira/browse/KAFKA-15336 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.6.0 > > > Similar to the ConfigProvider, the Javadocs for the Connect plugin classes > should mention that plugin implementations should have ServiceLoader > manifests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests
Greg Harris created KAFKA-15350: --- Summary: MetadataLoaderMetrics has ClassNotFoundException in system tests Key: KAFKA-15350 URL: https://issues.apache.org/jira/browse/KAFKA-15350 Project: Kafka Issue Type: Improvement Components: kraft, system tests Affects Versions: 3.6.0 Reporter: Greg Harris The system tests appear to be failing on trunk with: {noformat} [2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$) java.lang.NoClassDefFoundError: org/apache/kafka/image/loader/MetadataLoaderMetrics at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68) at kafka.Kafka$.buildServer(Kafka.scala:83) at kafka.Kafka$.main(Kafka.scala:91) at kafka.Kafka.main(Kafka.scala) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.image.loader.MetadataLoaderMetrics at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 4 more{noformat} This happens with the `tests/kafkatest/tests/connect/`, `tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1295198962 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: balance > rack matching (when applicable) > stickiness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final List subscriptionList; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + +RackInfo rackInfo = new RackInfo(assignmentSpec,
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1295197016 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: balance > rack matching (when applicable) > stickiness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final List subscriptionList; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + +RackInfo rackInfo = new RackInfo(assignmentSpec,
[GitHub] [kafka] lucasbru commented on pull request #14216: KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434
lucasbru commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-1679690414 https://jenkins.confluent.io/job/confluentinc/job/kafka-streams-benchmarks/job/master/653/parameters/ https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5796/parameters/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known
junrao merged PR #14179: URL: https://github.com/apache/kafka/pull/14179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails
Greg Harris created KAFKA-15349: --- Summary: ducker-ak should fail fast when gradlew systemTestLibs fails Key: KAFKA-15349 URL: https://issues.apache.org/jira/browse/KAFKA-15349 Project: Kafka Issue Type: Improvement Components: system tests Reporter: Greg Harris If you introduce a flaw into the gradle build which causes the systemTestLibs to fail, such as a circular dependency, then the ducker_test function continues to run tests which are invalid. Rather than proceeding to run the tests, the script should fail fast and make the user address the error before continuing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1295140057 ## metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java: ## Review Comment: It is because the compare is using the toString() which redacts sensitive data and I didn't remove any of the delegation tokens between IMAGE1 and IMAGE2 so all the keys match. Once I add a RemoveDelegationTokenRecord in the test, that should cause it to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable
gharris1727 merged PR #14177: URL: https://github.com/apache/kafka/pull/14177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #14213: KAFKA-15345; KRaft leader notifies when listener reaches epoch start
cmccabe commented on PR #14213: URL: https://github.com/apache/kafka/pull/14213#issuecomment-1679620704 > The RecordsBatchReader implementation is also changed to include control records. This makes it possible for the state machine learn about committed control records. This additional information can be used to compute the committed offset or for counting those bytes when determining when to snapshot the partition. I think we should do this change separately, since it may require downstream changes. I think a few implementations assume they don't get "empty" batches (that don't have records), which would no longer be true if control record batches were sent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable
gharris1727 commented on PR #14177: URL: https://github.com/apache/kafka/pull/14177#issuecomment-1679635472 CI failures appear unrelated, and the tests pass locally. The test being fixed shows no failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.
[ https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754771#comment-17754771 ] Greg Harris commented on KAFKA-15343: - Hi [~prasanth] and thank you for reporting this issue! It is certainly not good that one test can cause the whole build to fail, preventing other tests from running. Can you speak to the frequency that you've seen this failure? Naively I would expect that with >1 ephemeral ports available, that such a failure would be quite rare. If this is true, I don't think it is appropriate to disable these tests. They are extremely important test coverage for the MirrorMaker2 feature, and disabling them may lead to undetected regressions. As far as resolving this issue, I think we should: 1. Find where we are leaking Kafka clients in the MM2 integration test suites, either within the framework or within the Mirror connectors. 2. Close Kafka clients in a timely fashion (some relevant work in https://issues.apache.org/jira/browse/KAFKA-14725 and https://issues.apache.org/jira/browse/KAFKA-15090 ) 2. Try to reproduce the Gradle daemon crash in a more controlled environment 3. Report the daemon crash to the Gradle upstream Since random port selection and port-reuse are standard procedures (not specific to Kafka) there could be downstream projects using Gradle that are affected. If there is something specific about the Kafka clients' connections that affect gradle, then we should investigate further to help the Gradle project resolve the issue. > Fix MirrorConnectIntegrationTests causing ci build failures. > > > Key: KAFKA-15343 > URL: https://issues.apache.org/jira/browse/KAFKA-15343 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 3.6.0 >Reporter: Prasanth Kumar >Priority: Major > > There are several instances of tests interacting badly with gradle daemon(s) > running on ports that the kafka broker previously used. After going through > the debug logs we observed a few retrying kafka clients trying to connect to > broker which got shutdown and the gradle worker chose the same port on which > broker was running. Later in the build, the gradle daemon attempted to > connect to the worker and could not, triggering a failure. Ideally gradle > would not exit when connected to from an invalid client - in testing with > netcat, it would often handle these without dying. However there appear to be > some cases where the daemon dies completely. Both the broker code and the > gradle workers bind to port 0, resulting in the OS assigning it an unused > port. This does avoid conflicts, but does not ensure that long lived clients > do not attempt to connect to these ports afterwards. It's possible that > closing the client in between may be enough to work around this issue. Till > then we will disable the test to avoid the ci blocker from testing the code > changes. > *MirrorConnectorsIntegrationBaseTest and extending Tests* > {code:java} > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] > MirrorConnectorsWithCustomForwardingAdminIntegrationTest > > testReplicateSourceDefault() STANDARD_OUT > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary REST service: http://localhost:43809/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO backup REST service: http://localhost:43323/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary brokers: localhost:37557 > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226) > [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] > Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557. > [2023-07-04T11:59:13.233Z] > org.gradle.internal.remote.internal.MessageIOException: Could not read > message from '/127.0.0.1:47660'. > [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on > [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, > addresses:[localhost/127.0.0.1]]. > [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] > [system.err] org.gradle.internal.remote.internal.ConnectException: Could not > connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, >
[GitHub] [kafka] jeqo opened a new pull request, #14219: KIP-405: 2023-08-15
jeqo opened a new pull request, #14219: URL: https://github.com/apache/kafka/pull/14219 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() { return leaderAndEpoch; } + Review Comment: whitespace not needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on PR #14208: URL: https://github.com/apache/kafka/pull/14208#issuecomment-1679586281 Thanks for the PR, @mumrah . I think at some point we'll need to add begin/end transactions to the set of things we "fuzz". Basically have a test that just makes sure that the MetadataLoader does the right thing no matter how many transaction markers are where. That doesn't need to be in this PR though Thanks also for the fix to QC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo closed pull request #14219: KIP-405: 2023-08-15
jeqo closed pull request #14219: KIP-405: 2023-08-15 URL: https://github.com/apache/kafka/pull/14219 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295076740 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -27,6 +27,59 @@ * Contains information about a set of changes that were loaded from the metadata log. */ public class LogDeltaManifest implements LoaderManifest { + +public static class Builder { +private MetadataProvenance provenance; +private LeaderAndEpoch leaderAndEpoch; +private Integer numBatches; Review Comment: seems a bit weird to use boxed primitives. It's quite inefficient in Java. If you want them to start with invalid values, just make them negative (that works for numBatches, elapsedNs, numBytes ... negative values for those don't make sense.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295091800 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs =
[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate
lianetm commented on code in PR #14218: URL: https://github.com/apache/kafka/pull/14218#discussion_r1295078074 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ## @@ -134,44 +132,20 @@ public static FetchMetricsManager createFetchMetricsManager(Metrics metrics) { } public static FetchConfig createFetchConfig(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -IsolationLevel isolationLevel = createIsolationLevel(config); -return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); + Deserializers deserializers) { +IsolationLevel isolationLevel = getConfiguredIsolationLevel(config); +return new FetchConfig<>(config, deserializers, isolationLevel); } -@SuppressWarnings("unchecked") -public static List> createConsumerInterceptors(ConsumerConfig config) { -return ClientUtils.createConfiguredInterceptors(config, -ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, -ConsumerInterceptor.class); +public static FetchConfig createFetchConfig(ConsumerConfig config) { +Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); Review Comment: Sure, done. This was initially added as part of these changes though (and not as part of the FetchRequestManager PR), so we'll have to take care of that when cherry-picking the FetchRequestManager commit but that's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084331 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs =
[GitHub] [kafka] gharris1727 commented on pull request #14194: KAFKA-15336: Add ServiceLoader Javadocs for Connect plugins
gharris1727 commented on PR #14194: URL: https://github.com/apache/kafka/pull/14194#issuecomment-1679551865 This is a documentation-only change, and test failures appear unrelated. A local `javadoc` build doesn't include any errors for these classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295090413 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -183,6 +183,8 @@ public MetadataLoader build() { */ private MetadataImage image; +private MetadataBatchLoader batchLoader; Review Comment: it seems like in the current implementation, this is `final` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084521 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs =
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() { return leaderAndEpoch; } + Review Comment: whitespace change not needed? ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -786,6 +798,7 @@ public void complete(Throwable exception) { } } + Review Comment: looks like extra whitespace? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295081634 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs =
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077448 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -66,6 +119,10 @@ public LogDeltaManifest( this.numBytes = numBytes; } +public static Builder newBuilder() { Review Comment: curious why this is better than just having a public Builder constructor (I don't feel strongly, I guess...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295074841 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1200,6 +1235,16 @@ public static List generateActivationRecords( throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControl.metadataVersion()); } } + +if (inTransaction) { +if (!featureControl.metadataVersion().isMetadataTransactionSupported()) { +throw new RuntimeException("Detected in-progress transaction, but the metadata.version " + featureControl.metadataVersion() + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected in-progress transaction during controller activation. Aborting this transaction."); Review Comment: I think we should include the start transaction offset in the log message (if you want, pass an OptionalLong to this function rather than a boolean?) That helps match up the beginning of the transaction with where it ends. Also should fill in that "reason" field, presumably with something like "controller failover" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295072484 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +909,62 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { +public CompletableFuture beginMigration() { log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +Arrays.asList( +ZkMigrationState.MIGRATION.toRecord(), +new ApiMessageAndVersion( +new EndTransactionRecord(), (short) 0) +)), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( Review Comment: check mv Set the "reason" field to "aborting ZK migration" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14194: KAFKA-15336: Add ServiceLoader Javadocs for Connect plugins
gharris1727 merged PR #14194: URL: https://github.com/apache/kafka/pull/14194 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -954,11 +985,14 @@ public void handleCommit(BatchReader reader) { // so we don't need to do it here. log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch); -// Complete any events in the purgatory that were waiting for this offset. -deferredEventQueue.completeUpTo(offset); +// Advance the committed and stable offsets then complete any pending purgatory +// items that were waiting for these offsets. +offsetControl.handleCommitBatch(batch); + deferredEventQueue.completeUpTo(offsetControl.lastStableOffset()); + deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset()); // The active controller can delete up to the current committed offset. -snapshotRegistry.deleteSnapshotsUpTo(offset); + snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset()); Review Comment: I guess we need to call this one out as a bug fix. Good find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295069992 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -623,7 +624,14 @@ enum ControllerOperationFlag { * even though the cluster really does have metadata. Very few operations should * use this flag. */ -RUNS_IN_PREMIGRATION +RUNS_IN_PREMIGRATION, + +/** + * This flag signifies that an event will be completed even if it is part of an unfinished transaction. + * This is needed for metadata transactions so that external callers can add records to a transaction Review Comment: Maybe mention ZK migration records as an example ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295071752 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +909,62 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { +public CompletableFuture beginMigration() { log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +Arrays.asList( Review Comment: This is another case where we have to check the MV I guess -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate
lianetm commented on code in PR #14218: URL: https://github.com/apache/kafka/pull/14218#discussion_r1295049009 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ## @@ -134,44 +132,20 @@ public static FetchMetricsManager createFetchMetricsManager(Metrics metrics) { } public static FetchConfig createFetchConfig(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -IsolationLevel isolationLevel = createIsolationLevel(config); -return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); + Deserializers deserializers) { +IsolationLevel isolationLevel = getConfiguredIsolationLevel(config); +return new FetchConfig<>(config, deserializers, isolationLevel); } -@SuppressWarnings("unchecked") -public static List> createConsumerInterceptors(ConsumerConfig config) { -return ClientUtils.createConfiguredInterceptors(config, -ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, -ConsumerInterceptor.class); +public static FetchConfig createFetchConfig(ConsumerConfig config) { +Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); Review Comment: Fixed. Just for the record, this is unused here but will be used in following PR with FetchRequestManager changes. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ## @@ -134,44 +132,20 @@ public static FetchMetricsManager createFetchMetricsManager(Metrics metrics) { } public static FetchConfig createFetchConfig(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -IsolationLevel isolationLevel = createIsolationLevel(config); -return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); + Deserializers deserializers) { +IsolationLevel isolationLevel = getConfiguredIsolationLevel(config); +return new FetchConfig<>(config, deserializers, isolationLevel); } -@SuppressWarnings("unchecked") -public static List> createConsumerInterceptors(ConsumerConfig config) { -return ClientUtils.createConfiguredInterceptors(config, -ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, -ConsumerInterceptor.class); +public static FetchConfig createFetchConfig(ConsumerConfig config) { +Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); Review Comment: You're right, I don't think it is. This was defined like this in the commit we are cherry-picking here, that's why it ended up here, but I'll include the latest changes with the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate
lianetm commented on code in PR #14218: URL: https://github.com/apache/kafka/pull/14218#discussion_r1295036974 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ## @@ -134,44 +132,20 @@ public static FetchMetricsManager createFetchMetricsManager(Metrics metrics) { } public static FetchConfig createFetchConfig(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -IsolationLevel isolationLevel = createIsolationLevel(config); -return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); + Deserializers deserializers) { +IsolationLevel isolationLevel = getConfiguredIsolationLevel(config); +return new FetchConfig<>(config, deserializers, isolationLevel); } -@SuppressWarnings("unchecked") -public static List> createConsumerInterceptors(ConsumerConfig config) { -return ClientUtils.createConfiguredInterceptors(config, -ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, -ConsumerInterceptor.class); +public static FetchConfig createFetchConfig(ConsumerConfig config) { +Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); Review Comment: You're right, I don't think it is. This was defined like this in the commit we are cherry-picking though, that's why it ended up here, but I'll include the latest changes with the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1295024473 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -114,18 +125,39 @@ public synchronized Cluster fetch() { /** * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed). + * There are two calculations for backing off based on how many attempts to retrieve metadata have been made + * since the last successful response, and how many equivalent metadata responses have been received. + * The second of these allows backing off when there are errors to do with stale metadata, even though the + * metadata responses are clean. + * + * This can be used to check whether it's worth requesting an update in the knowledge that it will + * not be delayed if this method returns 0. * * @param nowMs current time in ms * @return remaining time in ms till the cluster info can be updated again */ public synchronized long timeToAllowUpdate(long nowMs) { -return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0); +// Calculate the backoff for attempts which acts when metadata responses fail +long backoffForAttempts = Math.max(this.lastRefreshMs + +this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts - 1 : 0) - nowMs, 0); + +// Periodic updates based on expiration are not backed off based on equivalent responses +long backoffForEquivalentResponseCount; +if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0) == 0) { Review Comment: I've rearranged this slightly to reset `this.equivalentResponseCount` when the metadata expiration period has elapsed. This means that a sequence of periodic refreshes with unchanging metadata doesn't inexorably increase the delay (although that would anyway have been bounded by the max delay, which means by default the delay no longer increases after 4 iterations). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #14215: MINOR Install metadata publishers sooner in ControllerServer
cmccabe merged PR #14215: URL: https://github.com/apache/kafka/pull/14215 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #14199: MINOR Fix the ZkMigrationState metric in KafkaController
cmccabe closed pull request #14199: MINOR Fix the ZkMigrationState metric in KafkaController URL: https://github.com/apache/kafka/pull/14199 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1295030179 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -140,17 +172,34 @@ public long metadataExpireMs() { } /** - * Request an update of the current cluster metadata info, return the current updateVersion before the update + * Request an update of the current cluster metadata info, permitting backoff based on the number of + * equivalent metadata responses, which indicates that responses did not make progress and may be stale. + * @param permitBackoffOnEquivalentResponses Whether to permit backoff when consecutive responses are equivalent. + * This should be set to true in situations where the update is + * being requested to retry an operation, such as when the leader has + * changed. It should be set to false in situations where new + * metadata is being requested, such as adding a topic to a subscription. + * In situations where it's not clear, it's best to use false. + * @return The current updateVersion before the update */ -public synchronized int requestUpdate() { +public synchronized int requestUpdate(final boolean permitBackoffOnEquivalentResponses) { this.needFullUpdate = true; +if (!permitBackoffOnEquivalentResponses) { Review Comment: I think the negation is a little clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #14199: MINOR Fix the ZkMigrationState metric in KafkaController
cmccabe commented on PR #14199: URL: https://github.com/apache/kafka/pull/14199#issuecomment-1679483573 LGTM, committed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on PR #13562: URL: https://github.com/apache/kafka/pull/13562#issuecomment-1679474019 @mimaison please, take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
divijvaidya commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1294991302 ## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ## @@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { +val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + +val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) +server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") +} + +val topicProps = new Properties() +topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + +TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + +server.shutdown() + +val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + +assertThrows(classOf[ConfigException], () => TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))) + } + + @Test + def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit = { +val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + +var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) +server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") +} + +val topicProps = new Properties() +topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) + +TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + +server.shutdown() + +val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + +server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)) + +server.shutdown() + } + + @Test + def testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit = { +val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head + +val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps)) + +val topicProps = new Properties() +topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + +TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + +server.shutdown() Review Comment: this should be done in finally again so that if an exception is thrown, we are correctly cleaning up resources used by test i.e. closing the server. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
divijvaidya commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1294983667 ## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ## @@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { +val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head Review Comment: please add tests for kraft as well and for each case when broker is a controller, when broker is a KraftServer and when broker is a ZkServer. Asking because some of the initialization code path varies amongst them and we can't be sure that the config validation is always called correctly in the future as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
divijvaidya commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1294994048 ## core/src/main/scala/kafka/server/ConfigHandler.scala: ## @@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + +if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem() Review Comment: With this change we are splitting the logic of system-wide and topic-specific config at two places. One in LogConfig and this one in ConfigHandler. Could we choose to stick to one of them? I don't have a strong opinion on which one as long as it's at one consolidated place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate
philipnee commented on code in PR #14218: URL: https://github.com/apache/kafka/pull/14218#discussion_r1294990279 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java: ## @@ -134,44 +132,20 @@ public static FetchMetricsManager createFetchMetricsManager(Metrics metrics) { } public static FetchConfig createFetchConfig(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -IsolationLevel isolationLevel = createIsolationLevel(config); -return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); + Deserializers deserializers) { +IsolationLevel isolationLevel = getConfiguredIsolationLevel(config); +return new FetchConfig<>(config, deserializers, isolationLevel); } -@SuppressWarnings("unchecked") -public static List> createConsumerInterceptors(ConsumerConfig config) { -return ClientUtils.createConfiguredInterceptors(config, -ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, -ConsumerInterceptor.class); +public static FetchConfig createFetchConfig(ConsumerConfig config) { +Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); Review Comment: is this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on PR #14176: URL: https://github.com/apache/kafka/pull/14176#issuecomment-1679415679 Unrelated test failures: ``` [Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/) [Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testBumpTransactionalEpoch_String__quorum_kraft/) [Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) [Build / JDK 20 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_20_and_Scala_2_13___testThreadPoolResize__/) [Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_20_and_Scala_2_13___testBalancePartitionLeaders__/) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya merged PR #14176: URL: https://github.com/apache/kafka/pull/14176 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14212: MINOR: Add test case for follower fetch
divijvaidya commented on PR #14212: URL: https://github.com/apache/kafka/pull/14212#issuecomment-1679398284 Thank you for adding this. Please add what you wrote in the description as a comment in the test so that reader and quickly understand what the test is supposed to check. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14212/1/testReport/kafka.server/ReplicaFetcherTierStateMachineTest/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14212: MINOR: Add test case for follower fetch
divijvaidya commented on code in PR #14212: URL: https://github.com/apache/kafka/pull/14212#discussion_r1294964226 ## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ## @@ -85,6 +85,64 @@ class ReplicaFetcherTierStateMachineTest { assertEquals(9L, replicaState.logEndOffset) } + @Test + def testFollowerFetchMovedToAndDeletedFromTieredStore(): Unit = { +val partition = new TopicPartition("topic", 0) + +val replicaLog = Seq( + mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), + mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)), + mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes))) + +val replicaState = PartitionState(replicaLog, leaderEpoch = 7, highWatermark = 0L, rlmEnabled = true) + +val mockLeaderEndpoint = new MockLeaderEndPoint +val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) +val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) + +fetcher.setReplicaState(partition, replicaState) +fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7))) + +val leaderLog = Seq( + mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)), + mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)), + mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)), + mkBatch(baseOffset = 10, leaderEpoch = 7, new SimpleRecord("k".getBytes))) + +val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark = 10L, rlmEnabled = true) +// Overriding the log start offset to 5 for mocking the scenario of segments 5-6 moved to remote store and +// segments 0-4 expired. +leaderState.logStartOffset = 5 +fetcher.mockLeader.setLeaderState(partition, leaderState) + fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) + +assertEquals(3L, replicaState.logEndOffset) +val expectedState = if (truncateOnFetch) Option(Fetching) else Option(Truncating) +assertEquals(expectedState, fetcher.fetchState(partition).map(_.state)) + +fetcher.doWork() +// Verify that the out of range error is triggered and the fetch offset is reset to the global log start offset. +assertEquals(0L, replicaState.logStartOffset) Review Comment: Isn't this incorrect? logStartOffset should be 5 here after the first fetch call to the leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm opened a new pull request, #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate
lianetm opened a new pull request, #14218: URL: https://github.com/apache/kafka/pull/14218 This is a continuation of the previous [PR#13990](https://github.com/apache/kafka/pull/13990) This PR main refactoring relates to : - serializers/deserializers used in clients - unified in a Deserializers class - logic for configuring ClusterResourceListeners moved to ClientUtils - misc refactoring of the new async consumer in preparation for upcoming Request Managers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
nizhikov opened a new pull request, #14217: URL: https://github.com/apache/kafka/pull/14217 This PR is part of #13247 It includes `ReassignPartitionsCommandArgsTest` rewritten in java. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move ReassignPartitionsCommand to java
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1679370921 @mimaison I create #14217 which include `ReassignPartitionsCommandArgsTest` rewritten in java. It independent from the big PR and can be reviewed separately. Can you, please, take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1294925543 ## metadata/src/main/resources/common/metadata/DelegationTokenRecord.json: ## @@ -22,8 +22,10 @@ "fields": [ { "name": "Owner", "type": "string", "versions": "0+", "about": "The delegation token owner." }, +{ "name": "Requester", "type": "string", "versions": "0+", + "about": "The delegation token requester." }, Review Comment: Understood. This is also a case where the original creator of the fields missed this field from TokenInformation which is what ZK uses today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens
pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1294918864 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -267,6 +270,10 @@ public boolean isLeaderEpochBumpRequiredOnIsrShrink() { return !this.isAtLeast(IBP_3_6_IV0); } +public boolean isDelegationTokenSupported() { +return this.isAtLeast(IBP_3_6_IV1); +} Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson)
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson)
[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-15344: -- Description: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. was: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: David Mao >Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has
[GitHub] [kafka] AndrewJSchofield commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on PR #14111: URL: https://github.com/apache/kafka/pull/14111#issuecomment-1679338770 @junrao I don't know what's causing so many test failures. 78 of them were due to "unexpected threads" in the broker. I'll take another look when there's another build. I expect they're transient, but we'll see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1294877936 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, MetadataResponse response, b this.needPartialUpdate = requestVersion < this.requestVersion; this.lastRefreshMs = nowMs; +this.attempts = 0; this.updateVersion += 1; if (!isPartialUpdate) { this.needFullUpdate = false; this.lastSuccessfulRefreshMs = nowMs; } +this.backoffOnEquivalentResponses = true; +this.equivalentResponseCount++; Review Comment: 1. equivalentResponseCount is only reset when fresher metadata response is received. Suppose that we only have periodic metadata refresh for sometime and the metadata doesn't change. This will cause equivalentResponseCount to keep going up. When a metadata refresh is requested, this will cause the metadata refresh to backoff exponentially unexpectedly. 2. This is a bit unintuitive since we haven't checked whether the response is equivalent or not. Could we add a comment that this will be reset later if the metadata response causes the metadata to change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1294883198 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -140,17 +172,34 @@ public long metadataExpireMs() { } /** - * Request an update of the current cluster metadata info, return the current updateVersion before the update + * Request an update of the current cluster metadata info, permitting backoff based on the number of + * equivalent metadata responses, which indicates that responses did not make progress and may be stale. + * @param permitBackoffOnEquivalentResponses Whether to permit backoff when consecutive responses are equivalent. + * This should be set to true in situations where the update is + * being requested to retry an operation, such as when the leader has + * changed. It should be set to false in situations where new + * metadata is being requested, such as adding a topic to a subscription. + * In situations where it's not clear, it's best to use false. + * @return The current updateVersion before the update */ -public synchronized int requestUpdate() { +public synchronized int requestUpdate(final boolean permitBackoffOnEquivalentResponses) { this.needFullUpdate = true; +if (!permitBackoffOnEquivalentResponses) { Review Comment: Since we only take action when permitBackoffOnEquivalentResponses is false, would it be more intuitive to pass in the negation of that as sth like resetEquivalentResponseCount? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1294873505 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -114,18 +125,39 @@ public synchronized Cluster fetch() { /** * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed). + * There are two calculations for backing off based on how many attempts to retrieve metadata have been made + * since the last successful response, and how many equivalent metadata responses have been received. + * The second of these allows backing off when there are errors to do with stale metadata, even though the + * metadata responses are clean. + * + * This can be used to check whether it's worth requesting an update in the knowledge that it will + * not be delayed if this method returns 0. * * @param nowMs current time in ms * @return remaining time in ms till the cluster info can be updated again */ public synchronized long timeToAllowUpdate(long nowMs) { -return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0); +// Calculate the backoff for attempts which acts when metadata responses fail +long backoffForAttempts = Math.max(this.lastRefreshMs + +this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts - 1 : 0) - nowMs, 0); + +// Periodic updates based on expiration are not backed off based on equivalent responses +long backoffForEquivalentResponseCount; +if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0) == 0) { Review Comment: Instead of calculating the periodic refresh time, it's probably simpler to just check `updateRequested()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson)
[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15344: Component/s: streams > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: David Mao >Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > The low-hanging fruit fix would be to have streams pass in the message epoch > for each commit. Another fix discussed with [~hachikuji] is to have the > consumer cache leader epoch ranges, similar to how the broker maintains a > leader epoch cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic
divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1294874991 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala: ## @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +@Tag("integration") +class RemoteTopicCrudTest extends IntegrationTestHarness { + + val numPartitions = 2 + val numReplicationFactor = 2 + var testTopicName: String = _ + var sysRemoteStorageEnabled = true + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { +props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { +Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { +if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) { + sysRemoteStorageEnabled = false +} +super.setUp(info) +testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = { +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") +topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = { +// inherited local retention ms is 1000 +val topicConfig = new Properties() +topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") +topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") +TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) +verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = { +
[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store
[ https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi updated KAFKA-15257: - Description: Query types to consider include: * single-key latest-value lookup * single-key lookup with timestamp bound * single-key query with timestamp range * single-key all versions query * key-range latest-value query * key-range query with timestamp bound * key-range query with timestamp range * key-range all versions query * all-keys latest-value query * all-keys all versions (i.e., entire store) query was: KIP-960: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores] Query types to consider include: * single-key latest-value lookup * single-key lookup with timestamp bound * single-key query with timestamp range * single-key all versions query * key-range latest-value query * key-range query with timestamp bound * key-range query with timestamp range * key-range all versions query * all-keys latest-value query * all-keys all versions (i.e., entire store) query > Support interactive queries (IQv2) with versioned state store > - > > Key: KAFKA-15257 > URL: https://issues.apache.org/jira/browse/KAFKA-15257 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > Labels: kip > > Query types to consider include: > * single-key latest-value lookup > * single-key lookup with timestamp bound > * single-key query with timestamp range > * single-key all versions query > * key-range latest-value query > * key-range query with timestamp bound > * key-range query with timestamp range > * key-range all versions query > * all-keys latest-value query > * all-keys all versions (i.e., entire store) query > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi reassigned KAFKA-15347: Assignee: Alieh Saeedi > Single-Key_multi-timestamp IQs with versioned state stores > -- > > Key: KAFKA-15347 > URL: https://issues.apache.org/jira/browse/KAFKA-15347 > Project: Kafka > Issue Type: Sub-task >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers just four query types: > *Key Queries with multiple timestamps:* > # single-key query with upper bound timestamp > # single-key query with lower bound timestamp > # single-key query with timestamp range > # single-key all versions query -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15348) Range IQs with versioned state stores
Alieh Saeedi created KAFKA-15348: Summary: Range IQs with versioned state stores Key: KAFKA-15348 URL: https://issues.apache.org/jira/browse/KAFKA-15348 Project: Kafka Issue Type: Sub-task Reporter: Alieh Saeedi [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores] This ticket covers all types of range queries: *Range Queries* # key-range latest-value query # key-range with lower bound latest-value query # key-range with upper bound latest-value query # all-keys (no bound) latest-value query # key-range query with timestamp (upper) bound # key-range with lower bound with timestamp (upper) bound # key-range with upper bound with timestamp (upper) bound # all-keys (no bound) with timestamp (upper) bound # key-range query with timestamp range # key-range query with lower bound with timestamp range # key-range query with upper bound with timestamp range # all-keys (no bound) with timestamp range # key-range query all-versions # key-range query with lower bound all-versions # key-range query with upper bond all-versions # all-keys query (no bound) all-versions (entire store) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15348) Range IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi reassigned KAFKA-15348: Assignee: Alieh Saeedi > Range IQs with versioned state stores > - > > Key: KAFKA-15348 > URL: https://issues.apache.org/jira/browse/KAFKA-15348 > Project: Kafka > Issue Type: Sub-task >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers all types of range queries: > *Range Queries* > # key-range latest-value query > # key-range with lower bound latest-value query > # key-range with upper bound latest-value query > # all-keys (no bound) latest-value query > # key-range query with timestamp (upper) bound > # key-range with lower bound with timestamp (upper) bound > # key-range with upper bound with timestamp (upper) bound > # all-keys (no bound) with timestamp (upper) bound > # key-range query with timestamp range > # key-range query with lower bound with timestamp range > # key-range query with upper bound with timestamp range > # all-keys (no bound) with timestamp range > # key-range query all-versions > # key-range query with lower bound all-versions > # key-range query with upper bond all-versions > # all-keys query (no bound) all-versions (entire store) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores
Alieh Saeedi created KAFKA-15347: Summary: Single-Key_multi-timestamp IQs with versioned state stores Key: KAFKA-15347 URL: https://issues.apache.org/jira/browse/KAFKA-15347 Project: Kafka Issue Type: Sub-task Reporter: Alieh Saeedi [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] This ticket covers just four query types: *Key Queries with multiple timestamps:* # single-key query with upper bound timestamp # single-key query with lower bound timestamp # single-key query with timestamp range # single-key all versions query -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi updated KAFKA-15346: - Summary: Single-Key_single-timestamp IQs with versioned state stores (was: Single-Key_single-multi-timestamp IQs with versioned state stores) > Single-Key_single-timestamp IQs with versioned state stores > --- > > Key: KAFKA-15346 > URL: https://issues.apache.org/jira/browse/KAFKA-15346 > Project: Kafka > Issue Type: Sub-task >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers just two query types: > *Key Queries with single timestamp:* > # single-key latest-value lookup > # single-key lookup with timestamp (upper) bound -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15346) Single-Key_single-multi-timestamp IQs with versioned state stores
[ https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alieh Saeedi updated KAFKA-15346: - Description: [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] This ticket covers just two query types: *Key Queries with single timestamp:* # single-key latest-value lookup # single-key lookup with timestamp (upper) bound was: [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]] This ticket covers just two query types: *Key Queries with single timestamp:* # single-key latest-value lookup # single-key lookup with timestamp (upper) bound > Single-Key_single-multi-timestamp IQs with versioned state stores > - > > Key: KAFKA-15346 > URL: https://issues.apache.org/jira/browse/KAFKA-15346 > Project: Kafka > Issue Type: Sub-task >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > > [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores] > This ticket covers just two query types: > *Key Queries with single timestamp:* > # single-key latest-value lookup > # single-key lookup with timestamp (upper) bound -- This message was sent by Atlassian Jira (v8.20.10#820010)