[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744877#comment-17744877 ] Tom Lee commented on KAFKA-10337: - Thanks for pushing this through [~erikvanoosten], nice to see this finally land. :) > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Erik van Oosten >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ivanyu commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment
ivanyu commented on PR #13984: URL: https://github.com/apache/kafka/pull/13984#issuecomment-1643169441 Conflicts resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim opened a new pull request, #14056: URL: https://github.com/apache/kafka/pull/14056 Built on top of https://github.com/apache/kafka/pull/14017, this PR implements the existing Heartbeat API in the new Group Coordinator. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
satishd commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1268911909 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Thanks @kamalcph for the detailed explanation. It is better to return the epoch along with highest-offset and then check the minimum of the respective end offset in the local leader epoch chain for that epoch and the highest offset uploaded to remote storage for that epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset -replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch -) + +// Acquire the lock for the fetch state update. A race can happen between fetch requests from a rebooted broker. +// The requests before and after the reboot can carry different fetch metadata especially offsets and broker epoch. +// It can particularly affect the ISR expansion where we decide to expand based on stale fetch request but use the +// latest broker epoch to fill in the AlterPartition request. +inReadLock(leaderIsrUpdateLock) { + // Fence the fetch request with stale broker epoch from a rebooted follower. + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { +error(s"Received fetch request for $topicPartition with stale broker epoch=$brokerEpoch. The expected" + + s" broker epoch= $currentBrokerEpoch.\"") +return + } Review Comment: my understanding is that the epoch check needs to be done in the `updateFetchState` call under the atomic `updateAndGet`. I don't think it is worth error logging here. Error logs usually indicate an unexpected failure that may require intervention. If we are silently fencing a stale fetch, a log is probably not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset -replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch -) + +// Acquire the lock for the fetch state update. A race can happen between fetch requests from a rebooted broker. +// The requests before and after the reboot can carry different fetch metadata especially offsets and broker epoch. +// It can particularly affect the ISR expansion where we decide to expand based on stale fetch request but use the +// latest broker epoch to fill in the AlterPartition request. +inReadLock(leaderIsrUpdateLock) { + // Fence the fetch request with stale broker epoch from a rebooted follower. + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { +error(s"Received fetch request for $topicPartition with stale broker epoch=$brokerEpoch. The expected" + + s" broker epoch= $currentBrokerEpoch.\"") +return + } Review Comment: my understanding is that the epoch check needs to be done in the `updateFetchState` call under the atomic `updateAndGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13999: KAFKA-15176: add tests for tiered storage metrics
showuon commented on code in PR #13999: URL: https://github.com/apache/kafka/pull/13999#discussion_r1268886912 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3379,6 +3396,246 @@ class ReplicaManagerTest { testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(booleans = Array(true, false)) + def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = { +val replicaId = if (isFromFollower) 1 else -1 +val tp0 = new TopicPartition(topic, 0) +val tidp0 = new TopicIdPartition(topicId, tp0) +// create a replicaManager with remoteLog enabled +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) +try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq( + new LeaderAndIsrPartitionState() +.setTopicName(tp0.topic) +.setPartitionIndex(tp0.partition) +.setControllerEpoch(0) +.setLeader(leaderEpoch) +.setLeaderEpoch(0) +.setIsr(partition0Replicas) +.setPartitionEpoch(0) +.setReplicas(partition0Replicas) +.setIsNew(true) +).asJava, +topicIds, +Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, None.asJava) + // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately + val result = replicaManager.readFromLog(params, Seq(tidp0 -> new PartitionData(topicId, 1, 0, 10, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, false) + + if (isFromFollower) { +// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log +assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, result.head._2.error) + } else { +assertEquals(Errors.NONE, result.head._2.error) + } + assertEquals(startOffset, result.head._2.leaderLogStartOffset) + assertEquals(endOffset, result.head._2.leaderLogEndOffset) + assertEquals(highHW, result.head._2.highWatermark) + if (isFromFollower) { +assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent) + } else { +// for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch +assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent) + } +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(booleans = Array(true, false)) + def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): Unit = { +val replicaId = if (isFromFollower) 1 else -1 +val tp0 = new TopicPartition(topic, 0) +val tidp0 = new TopicIdPartition(topicId, tp0) +// create a replicaManager with remoteLog enabled +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true) +try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, +Seq( + new LeaderAndIsrPartitionState() +.setTopicName(tp0.topic) +.setPartitionIndex(tp0.partition) +.setControllerEpoch(0) +.setLeader(leaderEpoch) +.setLeaderEpoch(0) +.setIsr(partition0Replicas) +.setPartitionEpoch(0) +.setReplicas(partition0Replicas) +.setIsNew(true) +).asJava, +topicIds, +Set(new Node(0, "host1", 0), new Node(1, "host2",
[GitHub] [kafka] showuon commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently
showuon commented on code in PR #14051: URL: https://github.com/apache/kafka/pull/14051#discussion_r1268862349 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -998,7 +998,13 @@ class Partition(val topicPartition: TopicPartition, // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch //request broker epoch is -1 which bypasses the epoch verification. case kRaftMetadataCache: KRaftMetadataCache => -val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch +val mayBeReplica = getReplica(followerReplicaId) +// The topic is already deleted and we don't have any replica information. In this case, we can return false +// so as to avoid NPE +if (mayBeReplica.isEmpty) { + return false Review Comment: We should log something here. maybe: `warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14045: refactor(storage): topic-based RLMM consumer-manager/task related improvements
showuon commented on code in PR #14045: URL: https://github.com/apache/kafka/pull/14045#discussion_r1268858531 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java: ## @@ -87,36 +96,41 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T } /** - * Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}. + * Waits if necessary for the consumption to reach the {@code offset} of the given record + * at a certain {@code partition} of the metadata topic. Review Comment: This change should be reverted since the parameter is not updated. ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -182,13 +182,12 @@ private CompletableFuture storeRemoteLogMetadata(TopicIdPartition topicIdP CompletableFuture produceFuture = producerManager.publishMessage(remoteLogMetadata); // Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset. -return produceFuture.thenApplyAsync(recordMetadata -> { +return produceFuture.thenAcceptAsync(recordMetadata -> { try { - consumerManager.waitTillConsumptionCatchesUp(recordMetadata); + consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), recordMetadata.offset()); Review Comment: We should pass `recordMetadata` only, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
bmscomp commented on PR #14032: URL: https://github.com/apache/kafka/pull/14032#issuecomment-1642924837 @divijvaidya In the `retry_zinc` file bash script in the kafka root directory there is something about this issue but related to an older version of gradle ``` # Hacky workaround for https://github.com/gradle/gradle/issues/3777 # There is currently no configurable timeout, so we retry builds jenkins when we can't get a lock on the zinc compiler cache # Hopefully we can remove this in the future, but this will save us from having to manually rebuild for the time being. # Example: # [2021-10-19T17:25:07.234Z] * What went wrong: # [2021-10-19T17:25:07.234Z] Execution failed for task ':streams:streams-scala:compileScala'. # [2021-10-19T17:25:07.234Z] > Timeout waiting to lock zinc-1.3.5_2.13.6_8 compiler cache (/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_8). It is currently in use by another Gradle instance. # [2021-10-19T17:25:07.234Z] Owner PID: 3999 # [2021-10-19T17:25:07.234Z] Our PID: 3973 # [2021-10-19T17:25:07.234Z] Owner Operation: # [2021-10-19T17:25:07.234Z] Our operation: # [2021-10-19T17:25:07.234Z] Lock file: /home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_8/zinc-1.3.5_2.13.6_8.lock ``` Notice that we are using old version of zinc incremental plugin and may be it will be a good idea to consider upgrading it toe a recent version https://github.com/sbt/zinc/releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)
gharris1727 commented on PR #14055: URL: https://github.com/apache/kafka/pull/14055#issuecomment-1642919694 Here's an example error log output when in HYBRID_FAIL mode (I shortened the listing for brevity; the actual prints include every non-migrated plugin) ``` Plugins are missing ServiceLoader manifests, these plugins will not be visible with plugin.discovery=SERVICE_LOAD: [ classpathorg.apache.kafka.connect.converters.ByteArrayConverter undefined classpathorg.apache.kafka.connect.converters.DoubleConverter undefined classpathorg.apache.kafka.connect.converters.FloatConverter undefined classpathorg.apache.kafka.connect.converters.IntegerConverter undefined classpathorg.apache.kafka.connect.converters.LongConverter undefined classpathorg.apache.kafka.connect.converters.ShortConverter undefined classpath org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector 0.0.0 classpath org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector 0.0.0 ... classpathorg.apache.kafka.connect.transforms.predicates.TopicNameMatches undefined ] at app//org.apache.kafka.connect.runtime.isolation.Plugins.maybeReportHybridDiscoveryIssue(Plugins.java:131) at app//org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:92) at app//org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:74) at app//org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:64) at app//org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:121) at app//org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) at app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:187) at app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:283) at app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:148) ... (test suite) ``` This is what appears if the manifests included in this PR are not present. The same, but with HYBRID_WARN: ``` [2023-07-19 17:03:44,143] WARN Plugins are missing ServiceLoader manifests, these plugins will not be visible with plugin.discovery=SERVICE_LOAD: [ classpathorg.apache.kafka.connect.converters.ByteArrayConverter undefined ... classpathorg.apache.kafka.connect.transforms.predicates.TopicNameMatches undefined ] (org.apache.kafka.connect.runtime.isolation.Plugins:128) ``` I did notice that if logging is OFF or ERROR for the runtime (such as in the mirror tests) then the warning doesn't print if the caller overrides the configuration. The failure does propagate, so they will certainly see that in HYBRID_FAIL. I'm going to push the required manifests, so just delete/revert them if you'd like to see these errors on the classpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #14055: KAFKA-15031: Add configurable scanning modes to Connect worker config (KIP-898)
gharris1727 opened a new pull request, #14055: URL: https://github.com/apache/kafka/pull/14055 This is the primary feature for KIP-898, which allows users to reconfigure the Connect worker among the different scanning modes. The different modes and their functionality and intended use are as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
CalvinConfluent commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268783217 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition, fetchParams.replicaId, fetchPartitionData ) + +// Fence the fetch request with stale broker epoch from a rebooted follower. +if (metadataCache.isInstanceOf[KRaftMetadataCache]) { + val brokerEpoch = fetchParams.replicaEpoch + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { +throw new StaleBrokerEpochException(s"Received fetch request for $topicPartition with stale broker " + + s"epoch=$brokerEpoch. The expected broker epoch= $currentBrokerEpoch.") + } +} Review Comment: Make sense. Then just abort the fetch state update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #14054: KAFKA-14648: Moving bootstrap to NetworkClient Poll.
philipnee opened a new pull request, #14054: URL: https://github.com/apache/kafka/pull/14054 Motivation: Instantiating a new client may result in a fatal failure if the bootstrap server cannot be resolved due to misconfiguration or transient network issues such as slow DNS. This is suboptimal because of the fact that it might take a long time for the address to become available at the DNS server, and users will need to continue to retry. Also, the ConfigException exception type does not accurately reflect the root cause of the problem, which makes it hard to handle this failure case. We think it is reasonable to allow users to have a grace period to retry if the address cannot be resolved immediately. Also, poisoning the clients during the construction can be obstructive; I think it is better to fail the client on its first attempt to connect to the network. Changes: Changing the `NetworkClient` constructor to accept a set of bootstrap configurations, which is then used for bootstrapping during the `NetworkClient.poll()` operation. To avoid passing more parameters to the existing list, I wrapped the configurations into a `BootstrapConfiguration` class. On the client side, bootstrapping are removed as the motivation of the KIP is to avoid bootstrapping during the instance construction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 merged PR #13971: URL: https://github.com/apache/kafka/pull/13971 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on PR #13971: URL: https://github.com/apache/kafka/pull/13971#issuecomment-1642887109 Flaky test failures appear unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15221) Potential race condition between requests from rebooted followers
Calvin Liu created KAFKA-15221: -- Summary: Potential race condition between requests from rebooted followers Key: KAFKA-15221 URL: https://issues.apache.org/jira/browse/KAFKA-15221 Project: Kafka Issue Type: Bug Affects Versions: 3.5.0 Reporter: Calvin Liu Assignee: Calvin Liu Fix For: 3.6.0, 3.5.1 When the leader processes the fetch request, it does not acquire locks when updating the replica fetch state. Then there can be a race between the fetch requests from a rebooted follower. T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is not in ISR. T1, broker 1 crashes. T2 broker 1 is back online and receives a new broker epoch. Also, it sends a new Fetch request. T3 broker 0 receives the old fetch requests and decides to expand the ISR. T4 Right before broker 0 starts to fill the AlterPartitoin request, the new fetch request comes in and overwrites the fetch state. Then broker 0 uses the new broker epoch on the AlterPartition request. In this way, the AlterPartition request can get around KIP-903 and wrongly update the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1642861657 @junrao The tests ran again with (no code change in between) and now there are only four test failures in (seemingly) unrelated areas of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13949: KAFKA-15141: init logger statically on hot codepaths
ijuma commented on PR #13949: URL: https://github.com/apache/kafka/pull/13949#issuecomment-1642838445 I merged the PR. Thanks for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1268717074 ## metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * These are the metrics which are managed by the SnapshotEmitter class. + */ +public final class SnapshotEmitterMetrics implements AutoCloseable { +private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = getMetricName( +"SnapshotEmitter", "LatestSnapshotGeneratedBytes"); +private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = getMetricName( +"SnapshotEmitter", "LatestSnapshotGeneratedAgeMs"); + +private final Optional registry; +private final Time time; +private final AtomicLong latestSnapshotGeneratedBytes; +private final AtomicLong latestSnapshotGeneratedTimeMs; + +/** + * Create a new LoaderMetrics object. + * + * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. + */ +public SnapshotEmitterMetrics( +Optional registry, +Time time, +long initialLatestSnapshotGeneratedBytes +) { +this.registry = registry; +this.time = time; +this.latestSnapshotGeneratedBytes = new AtomicLong(initialLatestSnapshotGeneratedBytes); +this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs()); +registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, new Gauge() { +@Override +public Long value() { +return latestSnapshotGeneratedBytes(); +} +})); +registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, new Gauge() { +@Override +public Long value() { +return latestSnapshotGeneratedAgeMs(); +} +})); +} + +long monoTimeInMs() { +return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds()); +} + +public void setLatestSnapshotGeneratedBytes(long value) { +this.latestSnapshotGeneratedBytes.set(value); +} Review Comment: Fixed, and this is now tested. ## metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * These are the metrics which are managed by the SnapshotEmitter class. + */ +public final class SnapshotEmitterMetrics implements AutoCloseable {
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1268716163 ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -54,6 +54,14 @@ public class QuorumControllerMetrics implements AutoCloseable { "KafkaController", "LastAppliedRecordTimestamp"); private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName( "KafkaController", "LastAppliedRecordLagMs"); Review Comment: I'd prefer it this way since we're initializing a bunch of constants here, and sometimes we want to grep for stuff. It's kind of annoying when you have "matryoshka doll initialization" of constants. Like rather than having: ``` static final String FOO = "foo"; static final String BAR = "bar"; static final String FOO_BAR = FOO + BAR; static final String FOO_BAR_BAZ = FOO_BAR + "baz"; ``` I'd rather just have: ``` static final String FOO = "foo"; static final String BAR = "bar"; static final String FOO_BAR = "foobar"; static final String FOO_BAR_BAZ = "foobarbaz"; ``` since then I can grep for the constant and it works as expected. Although maybe other people disagree? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #14053: Fence follower fetch with stale broker epoch
splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268702825 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition, fetchParams.replicaId, fetchPartitionData ) + +// Fence the fetch request with stale broker epoch from a rebooted follower. +if (metadataCache.isInstanceOf[KRaftMetadataCache]) { + val brokerEpoch = fetchParams.replicaEpoch + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { +throw new StaleBrokerEpochException(s"Received fetch request for $topicPartition with stale broker " + + s"epoch=$brokerEpoch. The expected broker epoch= $currentBrokerEpoch.") + } +} Review Comment: We would also need to update the `Replica.updateFetchState` call to check the brokerEpoch before trying to apply a fetch update. Otherwise we can pass this check and race for an update. I don't think we need to check whether we are in KRaft mode or not. The broker epoch should be monotonic in both zookeeper and kraft mode. I also don't think we can throw `StaleBrokerEpochException` since this would introduce a new error code returned on fetch responses without a KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent opened a new pull request, #14053: Fence follower fetch with stale broker epoch
CalvinConfluent opened a new pull request, #14053: URL: https://github.com/apache/kafka/pull/14053 There are two changes here: 1. Acquire read lock when updating the follower fetch state. This can prevent a race case with fetch requests from a rebooted follower. 2. Fence the follower fetch request with a stale broker epoch in Kraft mode. The broker epoch increases monotonically in Kraft mode. So if a fetch request with a smaller broker epoch, it should be stale. UT updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-10579: Fix Version/s: 3.6.0 > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: A. Sophie Blee-Goldman >Assignee: Greg Harris >Priority: Major > Labels: flaky-test > Fix For: 3.6.0 > > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
[ https://issues.apache.org/jira/browse/KAFKA-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-12842. - Fix Version/s: 3.6.0 Resolution: Fixed > Failing test: > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic > -- > > Key: KAFKA-12842 > URL: https://issues.apache.org/jira/browse/KAFKA-12842 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: John Roesler >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > This test failed during a PR build, which means that it failed twice in a > row, due to the test-retry logic in PR builds. > > [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209] > > {noformat} > java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141) > at > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at >
[jira] [Resolved] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke
[ https://issues.apache.org/jira/browse/KAFKA-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-8690. Fix Version/s: 3.6.0 Resolution: Fixed > Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke > --- > > Key: KAFKA-8690 > URL: https://issues.apache.org/jira/browse/KAFKA-8690 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull] > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > > testAddAndRemoveWorker STARTED*02:56:46* > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46* > *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > > testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: > Condition not met within timeout 15000. Connector tasks did not start in > time.*02:56:46* at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46* > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46* > at > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-10579. - Resolution: Fixed > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: A. Sophie Blee-Goldman >Assignee: Greg Harris >Priority: Major > Labels: flaky-test > Fix For: 3.6.0 > > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 merged pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2
gharris1727 merged PR #14029: URL: https://github.com/apache/kafka/pull/14029 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2
gharris1727 commented on PR #14029: URL: https://github.com/apache/kafka/pull/14029#issuecomment-1642744154 Test failures appear unrelated, and this has passed unit tests, system tests, and stress testing already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
gharris1727 commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1268647699 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +@SuppressWarnings({"unchecked", "rawtypes"}) +Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); +// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect +// on their behavior, but users may want to wipe offsets from them to prevent the offsets topic +// from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: I think the currently proposed validation is reasonable, and I agree with the points raised above. I didn't even know that the Checkpoint and Heartbeat connectors emitted offsets in the first place, since the operation of the connector never reads them back and I never had any use to trace their data flow. I think the GET portion of the API will be the first time any user sees these offsets, and the PUT / alterOffsets methods will be the first time that they see how they (don't) affect the operation of the connectors. Since it is harmless to have the users change these offsets, the alterOffsets calls should guide users to well-formed inputs but not restrict them more than that. For the connector, the biggest impact will be keeping the offsets store clean in case a future extension wishes to use the offsets in a meaningful way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
gharris1727 commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1268624984 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -933,12 +938,94 @@ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne } } -private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List> connectorClasses) { +protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List> connectorClasses) { for (Class connector : connectorClasses) { connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); } } +@SafeVarargs +protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class... connectorClasses) throws InterruptedException { +for (Class connectorClass : connectorClasses) { +connectCluster.resumeConnector(connectorClass.getSimpleName()); +} +for (Class connectorClass : connectorClasses) { +String connectorName = connectorClass.getSimpleName(); + connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning( +connectorName, +1, +"Connector '" + connectorName + "' and/or task did not resume in time" +); +} +} + +protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String... topics) { Review Comment: This also stops the connectors, which isn't described in the function name. Can you split this into two functions, or change the name to stopAndAlter.. etc? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -933,12 +938,94 @@ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne } } -private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List> connectorClasses) { +protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List> connectorClasses) { for (Class connector : connectorClasses) { connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); } } +@SafeVarargs +protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class... connectorClasses) throws InterruptedException { +for (Class connectorClass : connectorClasses) { +connectCluster.resumeConnector(connectorClass.getSimpleName()); +} +for (Class connectorClass : connectorClasses) { +String connectorName = connectorClass.getSimpleName(); + connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning( +connectorName, +1, +"Connector '" + connectorName + "' and/or task did not resume in time" +); +} +} + +protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String... topics) { +Set topicsSet = new HashSet<>(Arrays.asList(topics)); + +String connectorName = MirrorSourceConnector.class.getSimpleName(); +connectCluster.stopConnector(connectorName); Review Comment: Should this additionally wait for the connectors to stop? it appears that it just waits for the REST API to return a 200, which may complete before the tasks have stopped executing and committed offsets. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +
[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
divijvaidya commented on PR #14032: URL: https://github.com/apache/kafka/pull/14032#issuecomment-1642721299 We have had a large number of failures [1] since this was merged with `Timeout waiting to lock * cache (/home/jenkins/.gradle/caches/*/* It is currently in use by another Gradle instance.` Please investigate if you get a chance, else we may have to revert this commit tomorrow until we find the cause and fix. [1] https://ge.apache.org/scans/failures?failures.failureMessage=Execution%20failed%20for%20task%20*%0A%3E%20Timeout%20waiting%20to%20lock%20*%20cache%20(/home/jenkins/.gradle/caches/*/*%20It%20is%20currently%20in%20use%20by%20another%20Gradle%20instance.%0A%20%20Owner%20PID:%20*%0A%20%20Our%20PID:%20*%0A%20%20Owner%20Operation:%20%0A%20%20Our%20operation:%20%0A%20%20Lock%20file:%20/home/jenkins/.gradle/caches/*/*/*=kafka=Europe/Berlin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642696317 Thanks for the fix @dajac let's just confirm the build and we should be good :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #14052: KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)
philipnee opened a new pull request, #14052: URL: https://github.com/apache/kafka/pull/14052 TopicMetadataRequestManager implementation - *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13977: KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath
gharris1727 commented on PR #13977: URL: https://github.com/apache/kafka/pull/13977#issuecomment-1642662589 Test failures in CI appear unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac commented on code in PR #14046: URL: https://github.com/apache/kafka/pull/14046#discussion_r1268592972 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ## @@ -89,33 +88,29 @@ public Map offsets() { return offsets; } -public static List getErrorResponseTopics( -List requestTopics, Review Comment: The usage below was the single usage of this method so we always used it with all topics. For the context, I refactored it because I use the refactored one in one of the next patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642653815 > Something strange is going on with AuthorizerIntegrationTest in the build, but that might be unrelated. I will look into that. @jolshan I found the issue related to AuthorizerIntegrationTest failures. I just pushed a fix. See last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13949: KAFKA-15141: init logger statically on hot codepaths
ijuma merged PR #13949: URL: https://github.com/apache/kafka/pull/13949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +280,33 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } Review Comment: Probably time to update this now? ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: We shouldn't be using an `AtomicReference` here. The reason it's used in the [linked snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484) in the `ConsumerCoordinator` class is to ensure that, in the event that multiple errors occur, we don't overwrite the first exception that we saw. That's not a possibility here since `secondaryStoreTombstoneWriteError` is only ever updated in separate `catch` clauses for the same `try` block, which means that it's guaranteed to never be updated more than once. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: I like the general idea here right now: block indefinitely for exactly-once, block within the offset timeout otherwise. We also note in the [docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms) for the `offset.flush.timeout.ms` property that it "has no effect for source connectors running with exactly-once support". I don't think we need to worry about placing an upper bound on the time we take with exactly-once support enabled. If we did, it would make tasks more brittle (remember, we fail tasks when offset commits fail in this mode), and preemptively writing tombstone records to the secondary offsets topic shouldn't corrupt the offsets that a connector sees even if the current transaction (including a write to the connector-specific offsets topic) fails. We may end up writing garbage to the secondary offsets topic, but guarantees for exactly-once support are lost as soon as a connector switches over to reading exclusively from that topic, and tombstones in the secondary topic don't overwrite non-tombstone offsets for the same partition in the primary topic. That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a `Future` to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test Review Comment: +1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the changes to the main code are entirely contained within the
[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
jolshan commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268535712 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; + +import java.util.Objects; +import java.util.OptionalInt; +import java.util.OptionalLong; + +/** + * Represents a committed offset with its metadata. + */ +public class OffsetAndMetadata { +public static final String NO_METADATA = ""; Review Comment: What cases do we use this constant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
jolshan commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268526325 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -386,6 +386,18 @@ public short groupMetadataValueVersion() { } } +public short offsetCommitValueVersion() { +if (isLessThan(MetadataVersion.IBP_2_1_IV0)) { +return 1; +} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) { +return 2; +} else { +// Serialize with the highest supported non-flexible version +// until a tagged field is introduced or the version is bumped. +return 3; +} Review Comment: Also do we expect to have lower IBPs if this is used for the new group coordinator? I didn't think we could go that low for kraft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
jolshan commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268525603 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -386,6 +386,18 @@ public short groupMetadataValueVersion() { } } +public short offsetCommitValueVersion() { +if (isLessThan(MetadataVersion.IBP_2_1_IV0)) { +return 1; +} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) { +return 2; +} else { +// Serialize with the highest supported non-flexible version +// until a tagged field is introduced or the version is bumped. +return 3; +} Review Comment: > // Serialize with the highest supported non-flexible version // until a tagged field is introduced or the version is bumped. This comment confused me a bit. Do we plan to manually update this method when new versions come in? Why is there a callout for flexible versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
yashmayya commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1268520364 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java: ## @@ -46,4 +51,49 @@ public void startClusters() throws Exception { super.startClusters(); } +@Override +@Test +public void testReplication() throws Exception { +super.testReplication(); + +// Augment the base replication test case with some extra testing of the offset management +// API introduced in KIP-875 +// We do this only when exactly-once support is enabled in order to avoid having to worry about +// zombie tasks producing duplicate records and/or writing stale offsets to the offsets topic + +String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS); +String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS); + +// Explicitly move back to offset 0 +// Note that the connector treats the offset as the last-consumed offset, +// so it will start reading the topic partition from offset 1 when it resumes +alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, "test-topic-1"); +// Reset the offsets for test-topic-2 +resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2"); +resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class); + +int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + ((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS); +assertEquals(expectedRecordsTopic1, backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(), +"Records were not re-replicated to backup cluster after altering offsets."); +int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2; +assertEquals(expectedRecordsTopic2, backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, backupTopic2).count(), +"New topic was not re-replicated to backup cluster after altering offsets."); + +@SuppressWarnings({"unchecked", "rawtypes"}) +Class[] connectorsToReset = CONNECTOR_LIST.toArray(new Class[0]); +// Resetting the offsets for the heartbeat and checkpoint connectors doesn't have any effect +// on their behavior, but users may want to wipe offsets from them to prevent the offsets topic +// from growing infinitely. So, we include them in the list of connectors to reset as a sanity check Review Comment: > I've pushed a change that allows arbitrary source partitions to be used with null source offsets; LMKWYT. Thanks Chris, looks good. We can probably update the `FileStreamSourceConnector::alterOffsets` method with a similar change in a follow up as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers
yashmayya commented on PR #14044: URL: https://github.com/apache/kafka/pull/14044#issuecomment-1642566534 > The change from private to protected technically counts as a change to public interface, so we'd need a KIP for that Ah, I did wonder about this but wasn't entirely certain, thanks for the clarification! > I'm also a little hesitant to upgrade the visibility of these members regardless since that would limit the compatibility of plugins that rely on them (most likely by subclassing ConnectRecord, SinkRecord, etc.), since that would render them [binary incompatible](https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.7) with older versions of Connect where the fields were still private. I considered the binary compatibility impact of this change directly on plugins themselves (and there shouldn't be any), but good point on the backward compatibility restriction that would be imposed on any potential external subclasses of `ConnectRecord` due to this change. > Can we reduce the scope here to use fields instead of methods wherever possible, but without altering the visibility of any parts of our public API? Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
jolshan commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268514084 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; + +import java.util.Objects; +import java.util.OptionalInt; +import java.util.OptionalLong; + +/** + * Represents a committed offset with its metadata. + */ +public class OffsetAndMetadata { +public static final String NO_METADATA = ""; + +/** + * The committed offset. + */ +public final long offset; + +/** + * The leader epoch in use when the offset was committed. + */ +public final OptionalInt leaderEpoch; Review Comment: For my understanding, this is optional because versions < 6 won't have leader epochs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14036: MINOR: Fix Javadocs for SourceTaskContext::transactionContext and SinkTaskContext::errantRecordReporter to use NoSuchMethodError instead of
yashmayya commented on PR #14036: URL: https://github.com/apache/kafka/pull/14036#issuecomment-1642557203 Thanks Greg! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13797: KAFKA-14950: implement assign() and assignment()
philipnee commented on PR #13797: URL: https://github.com/apache/kafka/pull/13797#issuecomment-1642545962 @junrao - thanks for the review. I've addressed the two issues you pointed out. Would you be able to take another look at it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently
vamossagar12 opened a new pull request, #14051: URL: https://github.com/apache/kafka/pull/14051 When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions [here](https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554). But this time, t[here](https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001) might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
kamalcph commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1268420993 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: > But are the any drawbacks by removing highestLogOffset <= leaderEpochEndOffset from the while? What kind of problem we can face and why having this check is more safe rather than remove it? Assume there are multiple back-to-back unclean leader elections happened and only **one replica** is available at any point of time. Both B1 and B2 may not be aware of all the leader-epochs. If the consumer reads the data from the beginning of the topic, then we should be able to serve the respective remote log segments for the (epoch, start_offset) present in both B1 and B2. This patch only mark the segment as unreferenced if the current segment is a superset of all the previously uploaded segments for the same epoch which means the message is same across the segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
kamalcph commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: @satishd @Nickstery Correct me If I'm wrong: The logic to find the `copiedOffset` from remoteStorage doesn't take in account of the current-leader-epoch checkpoint file. In the above test, when B2 becomes leader, it's leader-epoch-checkpoint file will look like: (The case mentioned in the test can happen when acks is set to 1) ``` 0 2 0 0 1 151 ``` The logic to find the copied offset traverses the checkpoint file from latest-epoch. So, when B2 tries to find the copied offset: For epoch(1), there won't any uploaded segments, so it returns empty. For epoch(0), the highest copied offset will be 200 So, B2 will skip the segment S2 (101-190) which means there is a data loss from [151-190] @Nickstery This can be fixed if we update the logic to find the `copiedOffset`: ``` find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744726#comment-17744726 ] Matthias J. Sax commented on KAFKA-15190: - {quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. {quote} Thanks for calling this out. You are right; I missed this point. As you did mention "max recovery lag", I assume you have a stateful app that uses in-memory stores only? Another thing coming to my mind: the `client.id` has actually different purpose and should not be unique per `KafkaStreams` instance, but should be the _same_ for all instances (the name is a little bit mis-leading). For example, if you configure quotas, it's based on `client.id` and you usually want quotas to be set per application, not per instance. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #14000: [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments
vamossagar12 commented on PR #14000: URL: https://github.com/apache/kafka/pull/14000#issuecomment-1642481356 Thanks @gharris1727 . hmm the meaning of the variable and it's usage in the comment is slightly off in this case. `revokedInPrevious` being true doesn't just signify successive revoking rebalances (which is what the original comment reflects) but also signifies that *just* the previous round had one. I just realised that my updated comment isn't accurate as well and ideally both should be included. Something along the lines of ``` There are no lost assignments and there have been no revoking rebalances in the previous round(s) ``` would be more accurate imo. WDYT? Also, there is a bug where in due to empty lost assignments and a follow up rebalance post connector deletions can lead to these lines getting printed: https://github.com/apache/kafka/pull/14000/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4L459-R462 Ideally nothing should happen if lost assignments are empty. I haven't had the chance to take a look at fixing it though. Will file a ticket later when I have some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
kamalcph commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: @satishd The logic to find the `copiedOffset` from remoteStorage doesn't take in account of the current-leader-epoch checkpoint file. In the above test, when B2 becomes leader, it's leader-epoch-checkpoint file will look like: (The case mentioned in the test can happen when acks is set to 1) ``` 0 2 0 0 1 151 ``` The logic to find the copied offset traverses the checkpoint file from latest-epoch. So, when B2 tries to find the copied offset: For epoch(1), there won't any uploaded segments, so it returns empty. For epoch(0), the highest copied offset will be 200 So, B2 will skip the segment S2 (101-190) which means there is a data loss from [151-190] @Nickstery This can be fixed if we update the logic to find the `copiedOffset`: ``` find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14036: MINOR: Fix Javadocs for SourceTaskContext::transactionContext and SinkTaskContext::errantRecordReporter to use NoSuchMethodError instead of NoS
gharris1727 merged PR #14036: URL: https://github.com/apache/kafka/pull/14036 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
satishd commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1268373435 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Why would segment data for [151L,190L] is lost? This will be part of the next leader epoch state(viz 1) which should contain this segment. @kamalcph @Nickstery Am I missing anything here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642437560 Something strange is going on with AuthorizerIntegrationTest in the build, but that might be unrelated. I will look into that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 opened a new pull request, #14050: KAFKA-15220: Do not returned fenced brokers from getAliveBrokerNode
splett2 opened a new pull request, #14050: URL: https://github.com/apache/kafka/pull/14050 `getAliveBrokerNode` returns fenced brokers as alive which is inconsistent with methods like getAliveBrokerNodes. Add a filter to not return fenced brokers and adds a test to validate that `getAliveBrokerNode` is consistent with `getAliveBrokerNodes` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14000: [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments
gharris1727 commented on PR #14000: URL: https://github.com/apache/kafka/pull/14000#issuecomment-1642429559 Hey @vamossagar12 I don't think that this comment is incorrect or confusing enough to warrant stand-alone PR. If you have other substantive changes in this area, then we can re-examine this comment at that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15220) KRaftMetadataCache returns fenced brokers from getAliveBrokerNode
David Mao created KAFKA-15220: - Summary: KRaftMetadataCache returns fenced brokers from getAliveBrokerNode Key: KAFKA-15220 URL: https://issues.apache.org/jira/browse/KAFKA-15220 Project: Kafka Issue Type: Bug Reporter: David Mao Assignee: David Mao -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on code in PR #14046: URL: https://github.com/apache/kafka/pull/14046#discussion_r1268350437 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ## @@ -89,33 +88,29 @@ public Map offsets() { return offsets; } -public static List getErrorResponseTopics( -List requestTopics, Review Comment: Did we always just pass in all the topics here? I see the usage below that was removed, but curious if we would ever want to make a response for just some topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642424251 > Note the use of "latestVersionUnstable": true in the request schema. This means that this new version is not available yet unless activated. This also leaves us room to add the topic ID to this version if we so choose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642425069 > > Note the use of "latestVersionUnstable": true in the request schema. This means that this new version is not available yet unless activated. > > This also leaves us room to add the topic ID to this version if we so choose? Yeah, that's right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure
gharris1727 commented on PR #14020: URL: https://github.com/apache/kafka/pull/14020#issuecomment-1642410512 Closing this in favor of upgrading the library. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 closed pull request #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure
gharris1727 closed pull request #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure URL: https://github.com/apache/kafka/pull/14020 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15219) Support delegation tokens in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744700#comment-17744700 ] Proven Provenzano commented on KAFKA-15219: --- I'm sorry, I seem to have forgotten to create the Jira. I am almost done with this work. My WIP PR is https://github.com/apache/kafka/pull/13916/files#diff-70391f7b23b5528f11808d38481254c5e697e531e1d962f6f03bf759a2cca5fc > Support delegation tokens in KRaft > -- > > Key: KAFKA-15219 > URL: https://issues.apache.org/jira/browse/KAFKA-15219 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.0 >Reporter: Viktor Somogyi-Vass >Assignee: Viktor Somogyi-Vass >Priority: Critical > > Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft > enabled the way to supporting them in KIP-900 by adding SCRAM support but > delegation tokens still don't support KRaft. > There are multiple issues: > - TokenManager still would try to create tokens in Zookeeper. Instead of this > we should forward admin requests to the controller that would store them in > the metadata similarly to SCRAM. We probably won't need new protocols just > enveloping similarly to other existing controller requests. > - TokenManager should run on Controller nodes only (or in mixed mode). > - Integration tests will need to be adapted as well and parameterize them > with Zookeeper/KRaft. > - Documentation needs to be improved to factor in KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15211) DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after TestSslUtils#generate
[ https://issues.apache.org/jira/browse/KAFKA-15211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15211. - Fix Version/s: 3.6.0 Resolution: Fixed > DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after > TestSslUtils#generate > - > > Key: KAFKA-15211 > URL: https://issues.apache.org/jira/browse/KAFKA-15211 > Project: Kafka > Issue Type: Test > Components: clients, KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.6.0 > > > The DistributedConfigTest#shouldFailWithInvalidKeySize attempts to configure > a hashing algorithm with a key size of 0. When run alone, this test passes, > as the default Java hashing algorithm used rejects the key size. > However, when TestSslUtils#generate runs first, such as via the > RestForwardingIntegrationTest, the BouncyCastleProvider is loaded, which > provides an alternative hashing algorithm. This implementation does _not_ > reject the key size, causing the test to fail. > We should ether prevent TestSslUtils#generate from leaving the > BouncyCastleProvider loaded after use, or adjust the test to pass when the > BouncyCastleProvider is loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on PR #13278: URL: https://github.com/apache/kafka/pull/13278#issuecomment-1642362522 @mimaison all your comments addressed except the one with the Tuple. Please, review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest
gharris1727 merged PR #14039: URL: https://github.com/apache/kafka/pull/14039 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268293020 ## tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A command for delete records of the given partitions down to the specified offset. + */ +public class DeleteRecordsCommand { +private static final int EARLIEST_VERSION = 1; + +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + +private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong(); + +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static void main(String[] args) throws Exception { +execute(args, System.out); +} + +static Collection> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException { +JsonValue js = Json.parseFull(jsonData) +.orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON")); + +Optional version = js.asJsonObject().get("version"); + +return parseJsonData(version.isPresent() ? version.get().to(INT) : EARLIEST_VERSION, js); +} + +private static Collection> parseJsonData(int version, JsonValue js) throws JsonMappingException { +if (version == 1) { +JsonValue partitions = js.asJsonObject().get("partitions") +.orElseThrow(() -> new AdminOperationException("Missing partitions field")); + +Collection> res = new ArrayList<>(); + +Iterator iterator = partitions.asJsonArray().iterator(); + +while (iterator.hasNext()) { +JsonObject partitionJs = iterator.next().asJsonObject(); + +String topic = partitionJs.apply("topic").to(STRING); +int partition = partitionJs.apply("partition").to(INT); +long offset = partitionJs.apply("offset").to(LONG); + +res.add(new Tuple<>(new TopicPartition(topic, partition), offset)); +} + +return res; +} + +throw new AdminOperationException("Not supported version field value " + version); +} + +public static void execute(String[] args, PrintStream out) throws IOException { +DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args); + +try (Admin adminClient = createAdminClient(opts)) { +execute(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out); +} +} + +static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException { +Collection> offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString); + +Set
[GitHub] [kafka] gharris1727 commented on pull request #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest
gharris1727 commented on PR #14039: URL: https://github.com/apache/kafka/pull/14039#issuecomment-1642357913 Test failures look unrelated, and tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268286710 ## tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A command for delete records of the given partitions down to the specified offset. + */ +public class DeleteRecordsCommand { +private static final int EARLIEST_VERSION = 1; + +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + +private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong(); + +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static void main(String[] args) throws Exception { +execute(args, System.out); +} + +static Collection> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException { +JsonValue js = Json.parseFull(jsonData) +.orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON")); + +Optional version = js.asJsonObject().get("version"); + +return parseJsonData(version.isPresent() ? version.get().to(INT) : EARLIEST_VERSION, js); +} + +private static Collection> parseJsonData(int version, JsonValue js) throws JsonMappingException { +if (version == 1) { +JsonValue partitions = js.asJsonObject().get("partitions") +.orElseThrow(() -> new AdminOperationException("Missing partitions field")); + +Collection> res = new ArrayList<>(); + +Iterator iterator = partitions.asJsonArray().iterator(); + +while (iterator.hasNext()) { +JsonObject partitionJs = iterator.next().asJsonObject(); + +String topic = partitionJs.apply("topic").to(STRING); +int partition = partitionJs.apply("partition").to(INT); +long offset = partitionJs.apply("offset").to(LONG); + +res.add(new Tuple<>(new TopicPartition(topic, partition), offset)); +} + +return res; +} + +throw new AdminOperationException("Not supported version field value " + version); +} + +public static void execute(String[] args, PrintStream out) throws IOException { +DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args); + +try (Admin adminClient = createAdminClient(opts)) { +execute(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out); +} +} + +static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException { +Collection> offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString); + +Set
[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller memebership changes
[ https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14094: --- Summary: KIP-853: KRaft controller memebership changes (was: KIP-853: KRaft Voters Change) > KIP-853: KRaft controller memebership changes > - > > Key: KAFKA-14094 > URL: https://issues.apache.org/jira/browse/KAFKA-14094 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14094) KIP-853: KRaft Voters Change
[ https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14094: --- Fix Version/s: (was: 3.6.0) > KIP-853: KRaft Voters Change > > > Key: KAFKA-14094 > URL: https://issues.apache.org/jira/browse/KAFKA-14094 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15219) Support delegation tokens in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-15219: Description: Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft enabled the way to supporting them in KIP-900 by adding SCRAM support but delegation tokens still don't support KRaft. There are multiple issues: - TokenManager still would try to create tokens in Zookeeper. Instead of this we should forward admin requests to the controller that would store them in the metadata similarly to SCRAM. We probably won't need new protocols just enveloping similarly to other existing controller requests. - TokenManager should run on Controller nodes only (or in mixed mode). - Integration tests will need to be adapted as well and parameterize them with Zookeeper/KRaft. - Documentation needs to be improved to factor in KRaft. was: Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft enabled the way to supporting them in KIP-900 by adding SCRAM support but delegation tokens still don't support KRaft. There are multiple issues: - TokenManager still would try to create tokens in Zookeeper. Instead of this we should forward admin requests to the controller that would store them in the metadata similarly to SCRAM. - TokenManager should run on Controller nodes only (or in mixed mode). - Integration tests will need to be adapted as well and parameterize them with Zookeeper/KRaft. - Documentation needs to be improved to factor in KRaft. > Support delegation tokens in KRaft > -- > > Key: KAFKA-15219 > URL: https://issues.apache.org/jira/browse/KAFKA-15219 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.0 >Reporter: Viktor Somogyi-Vass >Assignee: Viktor Somogyi-Vass >Priority: Critical > > Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft > enabled the way to supporting them in KIP-900 by adding SCRAM support but > delegation tokens still don't support KRaft. > There are multiple issues: > - TokenManager still would try to create tokens in Zookeeper. Instead of this > we should forward admin requests to the controller that would store them in > the metadata similarly to SCRAM. We probably won't need new protocols just > enveloping similarly to other existing controller requests. > - TokenManager should run on Controller nodes only (or in mixed mode). > - Integration tests will need to be adapted as well and parameterize them > with Zookeeper/KRaft. > - Documentation needs to be improved to factor in KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15219) Support delegation tokens in KRaft
Viktor Somogyi-Vass created KAFKA-15219: --- Summary: Support delegation tokens in KRaft Key: KAFKA-15219 URL: https://issues.apache.org/jira/browse/KAFKA-15219 Project: Kafka Issue Type: Improvement Affects Versions: 3.6.0 Reporter: Viktor Somogyi-Vass Assignee: Viktor Somogyi-Vass Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft enabled the way to supporting them in KIP-900 by adding SCRAM support but delegation tokens still don't support KRaft. There are multiple issues: - TokenManager still would try to create tokens in Zookeeper. Instead of this we should forward admin requests to the controller that would store them in the metadata similarly to SCRAM. - TokenManager should run on Controller nodes only (or in mixed mode). - Integration tests will need to be adapted as well and parameterize them with Zookeeper/KRaft. - Documentation needs to be improved to factor in KRaft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268278161 ## tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL) +@Tag("integration") +public class DeleteRecordsCommandTest { + +private final ClusterInstance cluster; +public DeleteRecordsCommandTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testCommandZk() throws Exception { +Properties adminProps = new Properties(); + +adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1); + +try (Admin admin = cluster.createAdminClient(adminProps)) { +assertThrows( +AdminCommandFailedException.class, +() -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" + +"{\"topic\":\"t\", \"partition\":0, \"offset\":1}," + +"{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" + +"}", System.out), +"Offset json file contains duplicate topic partitions: t-0" +); + +admin.createTopics(Collections.singleton(new NewTopic("t", 1, (short) 1))).all().get(); + +Properties props = new Properties(); + +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); +props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); +props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + +try (KafkaProducer producer = new KafkaProducer<>(props)) { +producer.send(new ProducerRecord<>("t", "1")).get(); +producer.send(new ProducerRecord<>("t", "2")).get(); +producer.send(new ProducerRecord<>("t", "3")).get(); +} + +executeAndAssertOutput( +"{\"partitions\":[{\"topic\":\"t\", \"partition\":0, \"offset\":1}]}", +"partition: t-0\tlow_watermark: 1", +admin +); + +executeAndAssertOutput( +"{\"partitions\":[{\"topic\":\"t\", \"partition\":42, \"offset\":42}]}", +"partition: t-42\terror", +admin +); +} +} + +private static void executeAndAssertOutput(String json, String expOut, Admin admin) { +String output = +ToolsTestUtils.captureStandardOut(() -> DeleteRecordsCommand.execute0(admin, json, System.out)); +assertTrue(output.contains(expOut)); +} +} + +/** + * Unit test of {@link DeleteRecordsCommand} tool. + */ +class DeleteRecordsCommandUnitTest { +@Test +public void
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268270865 ## tools/src/test/java/org/apache/kafka/tools/CoreUtilsTest.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertIterableEquals; + +public class CoreUtilsTest { +@Test +public void testDuplicates() { +assertIterableEquals( Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268252459 ## tools/src/main/java/org/apache/kafka/tools/CoreUtils.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { Review Comment: Renamed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
mimaison commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1642274928 My point is that this really contradicts the KIP. In the KIP we say "it does not work" but then we have a test that relies on the upgrade working. This is a bit counter intuitive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes
C0urante commented on PR #13853: URL: https://github.com/apache/kafka/pull/13853#issuecomment-1642271686 The refactor that @gharris1727 is suggesting here would also be nice since it would allow us to establish states that are only applicable to `Connector` or `Task` instances. There's a use case for this that exists today: the `STOPPED` state will only ever be visible for `Connector` instances, and never for `Tasks`, so it doesn't really make sense to use the same state enum for the two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers
C0urante commented on PR #14044: URL: https://github.com/apache/kafka/pull/14044#issuecomment-1642230322 The change from `private` to `protected` technically counts as a change to public interface, so we'd need a KIP for that. I'm also a little hesitant to upgrade the visibility of these members regardless since that would limit the compatibility of plugins that rely on them (most likely by subclassing `ConnectRecord`, `SinkRecord`, etc.), since that would render them [binary incompatible](https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.7) with older versions of Connect where the fields were still private. Can we reduce the scope here to use fields instead of methods wherever possible, but without altering the visibility of any parts of our public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
fvaleri commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268183773 ## tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A command for delete records of the given partitions down to the specified offset. + */ +public class DeleteRecordsCommand { +private static final int EARLIEST_VERSION = 1; + +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + +private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong(); + +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static void main(String[] args) throws Exception { +execute(args, System.out); +} + +static Collection> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException { +JsonValue js = Json.parseFull(jsonData) +.orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON")); + +Optional version = js.asJsonObject().get("version"); + +return parseJsonData(version.isPresent() ? version.get().to(INT) : EARLIEST_VERSION, js); +} + +private static Collection> parseJsonData(int version, JsonValue js) throws JsonMappingException { +if (version == 1) { +JsonValue partitions = js.asJsonObject().get("partitions") +.orElseThrow(() -> new AdminOperationException("Missing partitions field")); + +Collection> res = new ArrayList<>(); + +Iterator iterator = partitions.asJsonArray().iterator(); + +while (iterator.hasNext()) { +JsonObject partitionJs = iterator.next().asJsonObject(); + +String topic = partitionJs.apply("topic").to(STRING); +int partition = partitionJs.apply("partition").to(INT); +long offset = partitionJs.apply("offset").to(LONG); + +res.add(new Tuple<>(new TopicPartition(topic, partition), offset)); +} + +return res; +} + +throw new AdminOperationException("Not supported version field value " + version); +} + +public static void execute(String[] args, PrintStream out) throws IOException { +DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args); + +try (Admin adminClient = createAdminClient(opts)) { +execute(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out); +} +} + +static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException { +Collection> offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString); + +Set duplicatePartitions
[GitHub] [kafka] fvaleri commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
fvaleri commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268183325 ## tools/src/main/java/org/apache/kafka/tools/CoreUtils.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { Review Comment: We already have ToolsUtils in server-common, and maybe we should think about moving it to the tools module in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer
[ https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744640#comment-17744640 ] Viktor Somogyi-Vass commented on KAFKA-8128: So after 4 years (yes) I think I'll pick this up. The problem came up more recently with OAuth but I believe the core problem here that JAAS contexts won't get reloaded. Making it a dynamic configuration would solve it but that probably requires a KIP too. I'll see what's up. [~gsomogyi] does this seem correct? Did you have problems because you had to change the JAAS config or were there any other problems that you experienced? > Dynamic delegation token change possibility for consumer/producer > - > > Key: KAFKA-8128 > URL: https://issues.apache.org/jira/browse/KAFKA-8128 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.2.0 >Reporter: Gabor Somogyi >Assignee: Viktor Somogyi-Vass >Priority: Major > > Re-authentication feature on broker side is under implementation which will > enforce consumer/producer instances to re-authenticate time to time. It would > be good to set the latest delegation token dynamically and not re-creating > consumer/producer instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer
[ https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reassigned KAFKA-8128: -- Assignee: Viktor Somogyi-Vass > Dynamic delegation token change possibility for consumer/producer > - > > Key: KAFKA-8128 > URL: https://issues.apache.org/jira/browse/KAFKA-8128 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.2.0 >Reporter: Gabor Somogyi >Assignee: Viktor Somogyi-Vass >Priority: Major > > Re-authentication feature on broker side is under implementation which will > enforce consumer/producer instances to re-authenticate time to time. It would > be good to set the latest delegation token dynamically and not re-creating > consumer/producer instances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
mimaison commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1268090551 ## tools/src/main/java/org/apache/kafka/tools/CoreUtils.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { Review Comment: Maybe `ToolsUtils` would be a better name? ## tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL) +@Tag("integration") +public class DeleteRecordsCommandTest { + +private final ClusterInstance cluster; +public DeleteRecordsCommandTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testCommandZk() throws Exception { +Properties adminProps = new Properties(); + +adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1); + +try (Admin admin = cluster.createAdminClient(adminProps)) { +assertThrows( +AdminCommandFailedException.class, +() -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" + +"{\"topic\":\"t\", \"partition\":0, \"offset\":1}," + +"{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" + +"}", System.out), +"Offset json file contains
[GitHub] [kafka] viktorsomogyi commented on pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup
viktorsomogyi commented on PR #13975: URL: https://github.com/apache/kafka/pull/13975#issuecomment-1642093548 Captured two screen recordings to demonstrate the reproduction of the problem. This is the bug: https://drive.google.com/file/d/1Itq_Fv9bwNtRtPsD9KLUnD9I-6BX6Tki/view?usp=sharing This is how the fix would help: https://drive.google.com/file/d/10Otvj880EF--BOARmhULhvixwWsAhIt9/view?usp=sharing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov opened a new pull request, #14049: KAFKA-14038: Optimise calculation of size for log in remote tier
clolov opened a new pull request, #14049: URL: https://github.com/apache/kafka/pull/14049 ### Summary This pull request aims to address sub-points 1 and 2 from https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier#KIP852:Optimizecalculationofsizeforloginremotetier-Codechanges. Add a new method - remoteLogSize - to the interface RemoteLogMetadataManager. Implement remoteLogSize in TopicBasedRemoteLogMetadataManager. Extend TopicBasedRemoteLogMetadataManagerTest to cover two scenarios (size calculation of no remote log; size calculation of a remote log). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15129) Clean up all metrics that were forgotten to be closed
[ https://issues.apache.org/jira/browse/KAFKA-15129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-15129: --- Description: In the current kafka code, there are still many module metrics that are forgotten to be closed when they stop, although some of them have been fixed, such as kafka-14866 and kafka-14868. et. These metric leaks may lead to potential OOM risks, and, in the unit tests and integration tests in the code, there are also a large number of `closes` without removing the metric, which will also cause CI test instability. By cleaning up these leaked indicators, these risks can be eliminated, and the security and stability of the code can be enhanced. Here I will find all the metrics that are forgotten and closed in the current version, and submit them according to the subtasks in order to fix them. was: In the current kafka code, there are still many module metrics that are forgotten to be closed when they stop, although some of them have been fixed, such as kafka-14866 and kafka-14868. et. Here I will find all the metrics that are forgotten and closed in the current version, and submit them according to the subtasks in order to fix them. > Clean up all metrics that were forgotten to be closed > - > > Key: KAFKA-15129 > URL: https://issues.apache.org/jira/browse/KAFKA-15129 > Project: Kafka > Issue Type: Improvement > Components: controller, core, log >Affects Versions: 3.5.0 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > In the current kafka code, there are still many module metrics that are > forgotten to be closed when they stop, although some of them have been fixed, > such as kafka-14866 and kafka-14868. et. > These metric leaks may lead to potential OOM risks, and, in the unit tests > and integration tests in the code, there are also a large number of `closes` > without removing the metric, which will also cause CI test instability. By > cleaning up these leaked indicators, these risks can be eliminated, and the > security and stability of the code can be enhanced. > Here I will find all the metrics that are forgotten and closed in the current > version, and submit them according to the subtasks in order to fix them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] msn-tldr opened a new pull request, #14048: On metadata-update, mark paritions for immediate retry
msn-tldr opened a new pull request, #14048: URL: https://github.com/apache/kafka/pull/14048 Alternative POC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008501 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); Review Comment: This corresponds to the logic [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1088). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008967 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); + +return new Record( +new ApiMessageAndVersion( +new OffsetCommitKey() +.setGroup(groupId) +.setTopic(topic) +.setPartition(partitionId), +(short) 1 +), +new ApiMessageAndVersion( +new OffsetCommitValue() +.setOffset(offsetAndMetadata.offset) + .setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) +.setMetadata(offsetAndMetadata.metadata) +.setCommitTimestamp(offsetAndMetadata.commitTimestampMs) +// Version 1 has a non-empty expireTimestamp field + .setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)), Review Comment: This comes from [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1094). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008079 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -386,6 +386,18 @@ public short groupMetadataValueVersion() { } } +public short offsetCommitValueVersion() { +if (isLessThan(MetadataVersion.IBP_2_1_IV0)) { +return 1; +} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) { +return 2; +} else { +// Serialize with the highest supported non-flexible version +// until a tagged field is introduced or the version is bumped. +return 3; +} Review Comment: This is taken from [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1088). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac opened a new pull request, #14047: URL: https://github.com/apache/kafka/pull/14047 This patch does a few things: 1) It introduces the `OffsetAndMetadata` class which hold the committed offsets in the group coordinator. 2) It adds methods to deal with OffsetCommit records to `RecordHelpers`. 3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of the OffsetCommit value record that should be used. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac opened a new pull request, #14046: URL: https://github.com/apache/kafka/pull/14046 This patch does a few things: 1) It introduces version 9 of the OffsetCommit API. This new version has no schema changes but it can return a StaleMemberEpochException if the new consumer group protocol is used. 2) It renames the `generationId` field in the request to `GenerationIdOrMemberEpoch`. This is backward compatible change. 3) It introduces the new StaleMemberEpochException error. 4) It does a minor refactoring in OffsetCommitRequest class. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
Nickstery commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1267948092 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Hi, thank you for explanation, sounds reasonable. Sorry for being too questionable. One more point: What could happen if we instead: ``` while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) if (highestLogOffset == null || leaderEpochEndOffset > highestLogOffset) { highestLogOffset = leaderEpochEndOffset; } ``` will do ``` while (lastEntry != null && lastEntry.getKey() >= startOffset) highestLogOffset = leaderEpochEndOffset; ``` How I see it: - In case we sent twice same segment [somehow], It will be replaced with the same one - In case unclean.leader.election set to true, we are going to have up to date data, in that case TS will work same as local storage function wise [data loss due to dead leader]. Less discrepancy - easier to maintain and understand - I do believe we do not send from leader and all the followers same segments. The leader is someone who sends data to tiered storage, so we expect at the same time send only 1 segment for partition-X, in case second request comes for same partition, it means that new leader is elected and it is in charge and its data is more valuable. - It is possible that even ISR won't be synced fully `replica.lag.time.max.ms` [default= 30s] allows that, it means that it is possible that few messages wont be synced and potentially deleted in case traffic stops coming few messages after Leader broker died and new leader got them. What if Broker 1 uploaded segment [0, 100], Broker-2 was insync by replicated [0,90]. Broker-1 died, Broker-2 elected as a Leader and got 5 messages, Tiered storage won't be in charge of those 5 messages and segment could be deleted. After traffic being restored new segment is going to roll and its offset starts with 96, when last offset in TS is 100, so we delete segment and push new one starting from 96, and we could have a gap afterwards. Of course this example is an edge case and configuration should be quite agressive for that in terms for retention times etc. But are the any drawbacks by removing `highestLogOffset <= leaderEpochEndOffset` from the while? What kind of problem we can face and why having this check is more safe rather than remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
Nickstery commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Test example ``` @Test void handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset() { // Broker-1 is a Leader for Partition-0, Broker-2 is follower RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); //Leader and follower are in sync epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1BRoker1, 100L); //Broker-1 received some messages and part of them were replicated by follower // Broker-2 was not able to replicate all the messages because Broker-1 is died. // But it was able to upload segment before death [101, 200] RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker1, 200L); // Broker-2 still in-sync according to `replica.lag.time.max.ms` // Last offset in sync is 150L // Broker-2 gets the leadership and fills the segment 101L-190L and uploads to Tiered storage RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker2, 190L); //Traffic stops //Since LeaderEpoch was changed, I expect that more fresh data will be on the TS assertEquals(190L, epochState.highestLogOffset()); //segment 2 uploaded by Broker-1 is expected to be unreferenced assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1)); assertEquals(1, epochState.unreferencedSegmentIds().size()); } ``` Than it could be part of integration test, we can upload new segment [191L, 300L] and consume from the beginning from TS. With current logic we have data loss from segment 2 where BROKER-2 was leader [151L,190L], but in case of Kafka local storage functionality, it would not happen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
Nickstery commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Test example ``` @Test void handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset() { // Broker-1 is a Leader for Partition-0, Broker-2 is follower RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); //Leader and follower are in sync epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1BRoker1, 100L); //Broker-1 received some messages and part of them were replicated by follower // Broker-2 was not able to replicate all the messages because Broker-1 is died. // But it was able to upload segment before death [101, 200] RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker1, 200L); // Broker-2 still in-sync according to `replica.lag.time.max.ms` // Last offset in sync is 150L // Broker-2 gets the leadership and fills the segment 101L-190L and uploads to Tiered storage RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker2, 190L); //Traffic stops //Since LeaderEpoch was changed, I expect that more fresh data will be on the TS assertEquals(190L, epochState.highestLogOffset()); //segment 2 uploaded by Broker-1 is expected to be unreferenced assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1)); assertEquals(1, epochState.unreferencedSegmentIds().size()); }``` Than it could be part of integration test, we can upload new segment [191L, 300L] and consume from the beginning from TS. With current logic we have data loss from segment 2 where BROKER-2 was leader [151L,190L], but in case of Kafka local storage functionality, it would not happen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache
Nickstery commented on code in PR #14004: URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java: ## @@ -100,17 +100,29 @@ void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, Long leaderEpochEndOffset) { +// If there are duplicate segments uploaded due to leader-election, then mark them as unreferenced. +// Duplicate segments can be uploaded when the previous leader had tier-lags and the next leader uploads the +// segment for the same leader-epoch which is a super-set of previously uploaded segments. +// (eg) +// case-1: Duplicate segment +// L0 uploaded segment S0 with offsets 0-100 and L1 uploaded segment S1 with offsets 0-200. +// We will mark the segment S0 as duplicate and add it to unreferencedSegmentIds. +// case-2: Overlapping segments +// L0 uploaded segment S0 with offsets 10-90 and L1 uploaded segment S1 with offsets 5-100, S2-101-200, +// and so on. When the consumer request for segment with offset 95, it should get the segment S1 and not S0. +Map.Entry lastEntry = offsetToId.lastEntry(); +while (lastEntry != null && lastEntry.getKey() >= startOffset && highestLogOffset <= leaderEpochEndOffset) { Review Comment: Test example ``` @Test void handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset() { // Broker-1 is a Leader for Partition-0, Broker-2 is follower RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); //Leader and follower are in sync epochState.handleSegmentWithCopySegmentFinishedState(10L, segmentId1BRoker1, 100L); //Broker-1 received some messages and part of them were replicated by follower // Broker-2 was not able to replicate all the messages because Broker-1 is died. // But it was able to upload segment before death [101, 200] RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker1, 200L); // Broker-2 still in-sync according to `replica.lag.time.max.ms` // Last offset in sync is 150L // Broker-2 gets the leadership and fills the segment 101L-190L and uploads to Tiered storage RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, Uuid.randomUuid()); epochState.handleSegmentWithCopySegmentFinishedState(101L, segmentId2Broker2, 190L); //Traffic stops //Since LeaderEpoch was changed, I expect that more fresh data will be on the TS assertEquals(190L, epochState.highestLogOffset()); //segment 2 uploaded by Broker-1 is expected to be unreferenced assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1)); assertEquals(1, epochState.unreferencedSegmentIds().size()); } Than it could be part of integration test, we can upload new segment [191L, 300L] and consume from the beginning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1641957263 Hey @forlack! Thank you for the suggestion, I have upgraded the dependency so that tests can run. Heya @mimaison, I have addressed your comment about adding the latest version to the LICENSE-binary file and I would be grateful for any last review as this has been open for sometime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org