[GitHub] [kafka] C0urante commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)
C0urante commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r921839879 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1327,30 +1334,39 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaProducer producer = new KafkaProducer<>(producerProps); -TopicAdmin topicAdmin; +// Prepare to create a topic admin if the task requires one, but do not actually create an instance +// until/unless one is needed +final AtomicReference topicAdmin = new AtomicReference<>(); +final Supplier topicAdminCreator = () -> topicAdmin.updateAndGet(existingAdmin -> { +if (existingAdmin != null) { +return existingAdmin; +} +Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, +sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); +Admin adminClient = Admin.create(adminOverrides); +return new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); +}); + Map topicCreationGroups; if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); // Create a topic admin that the task can use for topic creation -Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, -sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); -topicAdmin = new TopicAdmin(adminOverrides); +topicAdminCreator.get(); Review Comment: 👍 done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11608: KAFKA-13533: Clean up resources on failed connector and task startup
C0urante commented on PR #11608: URL: https://github.com/apache/kafka/pull/11608#issuecomment-1185204758 @tombentley FYI I've finished rebasing this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567091#comment-17567091 ] Christo Lolov commented on KAFKA-14058: --- Okay, thank you for the suggestion, I will have a look at them! > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #12411: KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica
hachikuji opened a new pull request, #12411: URL: https://github.com/apache/kafka/pull/12411 After the fix for https://github.com/apache/kafka/pull/12150, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14078) Replica fetches to follower should return NOT_LEADER error
Jason Gustafson created KAFKA-14078: --- Summary: Replica fetches to follower should return NOT_LEADER error Key: KAFKA-14078 URL: https://issues.apache.org/jira/browse/KAFKA-14078 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 3.3.0 After the fix for KAFKA-13837, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon commented on PR #12349: URL: https://github.com/apache/kafka/pull/12349#issuecomment-1185155731 @dajac @guozhangwang , do you want to have another look at this PR? You can also check this comment to know what we're trying to do in this PR: https://github.com/apache/kafka/pull/12349#pullrequestreview-1028116488. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14024: -- Description: Hi In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin). However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group. The workaround for this issue is to change the assignor back to eager assignors, ex: StickyAssignor, RoundRobinAssignor. To fix the issue, we come out 2 solutions: # we can explicitly wait for the async commit complete in onPrepareJoin, but that would let the KAFKA-13310 issue happen again. # 2.we can try to keep the async commit offset future currently inflight. So that we can make sure each Consumer#poll, we are waiting for the future completes Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records. === [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] we didn't wait for client to receive commit offset response here, so onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and client will loop in invoking onJoinPrepare. I think the EAGER mode don't have this problem because it will revoke the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to commit next round. reproduce: * single node Kafka version 3.2.0 && client version 3.2.0 * topic1 have 5 partititons * start a consumer1 (cooperative rebalance) * start another consumer2 (same consumer group) * consumer1 will hang for a long time before re-join * from server log consumer1 rebalance timeout before joineGroup and re-join with another memberId consume1's log keeps printing: 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 (ConsumerCoordinator.java:739) 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} (ConsumerCoordinator.java:1143) and coordinator's log: [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 56 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic members who haven't joined: Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx generation 57 (__consumer_offsets-30) with 3 members (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group xxx in CompletingRebalance state. Created a new member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for generation 57. The group has 3 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in s
[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r921765863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); +} + +// wait for commit offset response before timer. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); } +// return false when: +// 1. offset commit haven't done +// 2. offset commit failed with retriable exception and joinPrepare haven't expired +boolean onJoinPrepareAsyncCommitCompleted = true; +if (autoCommitOffsetRequestFuture != null) { +if (!autoCommitOffsetRequestFuture.isDone()) { +onJoinPrepareAsyncCommitCompleted = false; +} else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) { +onJoinPrepareAsyncCommitCompleted = joinPrepareTimer.isExpired(); +} else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) { +log.error("Asynchronous auto-commit of offsets failed: {}", autoCommitOffsetRequestFuture.exception().getMessage()); Review Comment: These 2 else if conditions are the same. Please fix it. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); +} + +// wait for commit offset response before timer. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); } +// return false when: +// 1. offset commit haven't done +// 2. offset commit failed with retriable exception and joinPrepare haven't expired +boolean onJoinPrepareAsyncCommitCompleted = true; +if (autoCommitOffsetRequestFuture != null) { +if (!autoCommitOffsetRequestFuture.isDone()) { +onJoinPrepareAsyncCommitCompleted = false; Review Comment: I think we should also check `joinPrepareTimer.isExpired();` here, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynch
[GitHub] [kafka] showuon commented on pull request #11783: KAFKA-10000: System tests (KIP-618)
showuon commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1185110992 > I've been able to get one green run of the test locally, but all other attempts have failed with timeouts, even when bumping the permitted duration for worker startup from one minute to five. > > I also fixed a typo that would have broken the `test_bounce` case. Yes, that's what I saw when running in my local env. I think we need to make sure it works well before we can merge it. @jsancio , this is the last PR for KIP-618. We'd like to put this into v3.3, but needs to make sure the new added/updated system tests didn't break any test. Could you help run it and confirm it? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on PR #12347: URL: https://github.com/apache/kafka/pull/12347#issuecomment-1185100303 @junrao , thanks for your review! @tombentley , do you want to have another look? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1185094040 I've been able to get one green run of the test locally, but all other attempts have failed with timeouts, even when bumping the permitted duration for worker startup from one minute to five. I also fixed a typo that would have broken the `test_bounce` case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #12410: MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest classes
C0urante opened a new pull request, #12410: URL: https://github.com/apache/kafka/pull/12410 The `ShutdownableThread` class isn't used anywhere outside of tests, and the `ThreadedTest` class only works in cases where the logic that's being tested uses a `ShutdownableThread`. Both of these classes are removed, and the `DistributedHerderTest` class is updated to properly report unexpected exceptions that take place on other threads (which appears to be the original purpose of the `ThreadedTest` class, although it was not actually doing this anywhere). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito
C0urante commented on PR #12409: URL: https://github.com/apache/kafka/pull/12409#issuecomment-1185021831 @clolov since you've been working on the JUnit migration for Streams, would you be interested in reviewing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito
C0urante opened a new pull request, #12409: URL: https://github.com/apache/kafka/pull/12409 [Jira](https://issues.apache.org/jira/browse/KAFKA-14058) Some notes: 1. Introduced new `ConcurrencyUtils` and `MockitoUtils` classes for reusable testing logic that will likely be used in the near future for [KAFKA-14059](https://issues.apache.org/jira/browse/KAFKA-14059) and [KAFKA-14060](https://issues.apache.org/jira/browse/KAFKA-14060). 2. Refactored a lot of common logic into dedicated methods, which reduces test size and should make tests easier to write. 3. Doubled the default record batch size from 1 to 2. This provides better coverage and, after all the refactoring from step 2, required no modifications to mocking, verification, or assertion logic anywhere in the test suite. 4. Stopped inheriting from the `ThreadedTest` class as it does nothing. 5. Once this looks good enough to merge, I'll begin applying the same changes to the `WorkerSourceTaskTest` and possibly `AbstractWorkerSourceTaskTest` test suites. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13888: Priority: Blocker (was: Major) > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: lqjacklee >Priority: Blocker > Fix For: 3.3.0 > > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14077) KRaft should support recovery from failed disk
Jason Gustafson created KAFKA-14077: --- Summary: KRaft should support recovery from failed disk Key: KAFKA-14077 URL: https://issues.apache.org/jira/browse/KAFKA-14077 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Fix For: 3.3.0 If one of the nodes in the metadata quorum has a disk failure, there is no way currently to safely bring the node back into the quorum. When we lose disk state, we are at risk of losing committed data even if the failure only affects a minority of the cluster. Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and v3. Initially, v1 is the leader and writes a record at offset 1. After v2 acknowledges replication of the record, it becomes committed. Suppose that v1 fails before v3 has a chance to replicate this record. As long as v1 remains down, the raft protocol guarantees that only v2 can become leader, so the record cannot be lost. The raft protocol expects that when v1 returns, it will still have that record, but what if there is a disk failure, the state cannot be recovered and v1 participates in leader election? Then we would have committed data on a minority of the voters. The main problem here concerns how we recover from this impaired state without risking the loss of this data. Consider a naive solution which brings v1 back with an empty disk. Since the node has lost is prior knowledge of the state of the quorum, it will vote for any candidate that comes along. If v3 becomes a candidate, then it will vote for itself and it just needs the vote from v1 to become leader. If that happens, then the committed data on v2 will become lost. This is just one scenario. In general, the invariants that the raft protocol is designed to preserve go out the window when disk state is lost. For example, it is also possible to contrive a scenario where the loss of disk state leads to multiple leaders. There is a good reason why raft requires that any vote cast by a voter is written to disk since otherwise the voter may vote for different candidates in the same epoch. Many systems solve this problem with a unique identifier which is generated automatically and stored on disk. This identifier is then committed to the raft log. If a disk changes, we would see a new identifier and we can prevent the node from breaking raft invariants. Then recovery from a failed disk requires a quorum reconfiguration. We need something like this in KRaft to make disk recovery possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics
[ https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566989#comment-17566989 ] Guozhang Wang commented on KAFKA-13846: --- Hi Jose the PR has been merged actually so we should close this ticket. I will go ahead and do it. > Add an overloaded metricOrElseCreate function in Metrics > > > Key: KAFKA-13846 > URL: https://issues.apache.org/jira/browse/KAFKA-13846 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: newbie > > The `Metrics` registry is often used by concurrent threads, however it's > get/create APIs are not well suited for it. A common pattern from the user > today is: > {code} > metric = metrics.metric(metricName); > if (metric == null) { > try { > metrics.createMetric(..) > } catch (IllegalArgumentException e){ > // another thread may create the metric at the mean time > } > } > {code} > Otherwise the caller would need to synchronize the whole block trying to get > the metric. However, the `createMetric` function call itself indeed > synchronize internally on updating the metric map. > So we could consider adding a metricOrElseCreate function which is similar to > createMetric, but instead of throwing an illegal argument exception within > the internal synchronization block, it would just return the already existing > metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics
[ https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13846. --- Fix Version/s: 3.3.0 Resolution: Fixed > Add an overloaded metricOrElseCreate function in Metrics > > > Key: KAFKA-13846 > URL: https://issues.apache.org/jira/browse/KAFKA-13846 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: newbie > Fix For: 3.3.0 > > > The `Metrics` registry is often used by concurrent threads, however it's > get/create APIs are not well suited for it. A common pattern from the user > today is: > {code} > metric = metrics.metric(metricName); > if (metric == null) { > try { > metrics.createMetric(..) > } catch (IllegalArgumentException e){ > // another thread may create the metric at the mean time > } > } > {code} > Otherwise the caller would need to synchronize the whole block trying to get > the metric. However, the `createMetric` function call itself indeed > synchronize internally on updating the metric map. > So we could consider adding a metricOrElseCreate function which is similar to > createMetric, but instead of throwing an illegal argument exception within > the internal synchronization block, it would just return the already existing > metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions
[ https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566986#comment-17566986 ] Jim Hughes commented on KAFKA-14076: CloseOptions was introduced in https://github.com/apache/kafka/commit/9dc332f5ca34b80af369646f767c40c6b189f831. > Fix issues with KafkaStreams.CloseOptions > - > > Key: KAFKA-14076 > URL: https://issues.apache.org/jira/browse/KAFKA-14076 > Project: Kafka > Issue Type: Bug >Reporter: Jim Hughes >Priority: Major > > The new `close(CloseOptions)` function has a few bugs. > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)] > Notably, it needs to remove CGs per StreamThread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
jnh5y commented on code in PR #12408: URL: https://github.com/apache/kafka/pull/12408#discussion_r921566920 ## streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { Review Comment: I'm not wild about this IT as written. I copied from the `AbstractResetIntegrationTest` and I'd be happy to hear a suggestion on how to make a more minimal test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y opened a new pull request, #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
jnh5y opened a new pull request, #12408: URL: https://github.com/apache/kafka/pull/12408 * Addresses issues with `KafkaStreams.close(CloseOptions)`. * Adds an integration test for this new functionality. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions
Jim Hughes created KAFKA-14076: -- Summary: Fix issues with KafkaStreams.CloseOptions Key: KAFKA-14076 URL: https://issues.apache.org/jira/browse/KAFKA-14076 Project: Kafka Issue Type: Bug Reporter: Jim Hughes The new `close(CloseOptions)` function has a few bugs. ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)] Notably, it needs to remove CGs per StreamThread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] peternied opened a new pull request, #12407: [BUG] Remove duplicate common.message.* from clients:test jar file
peternied opened a new pull request, #12407: URL: https://github.com/apache/kafka/pull/12407 When consuming both `kafka-client:3.0.1` and `kafka-client:3.0.1:test` through maven a hygene tool was detecting multiple instances of the same class loaded into the classpath. Verified this change by building locally with a before and after build with `./gradlew clients:publishToMavenLocal`, then used beyond compare to verify the contents. ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - Minor change to existing build process, the java classes was duplicated and unused. - [X] Verify test coverage and CI build status - There should be no changes in test coverage and CI build status. - [X] Verify documentation (including upgrade notes) - No documentation updates need to be made -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14075) Consumer Group deletion does not delete pending transactional offset commits
Jeff Kim created KAFKA-14075: Summary: Consumer Group deletion does not delete pending transactional offset commits Key: KAFKA-14075 URL: https://issues.apache.org/jira/browse/KAFKA-14075 Project: Kafka Issue Type: Bug Reporter: Jeff Kim Assignee: Jeff Kim In [GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740] we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon group deletion. So only transactional offset commits for topic partitions already in the offsets cache will be deleted. However, we add a transactional offset commit to the offsets cache only after the commit/abort marker is written to the log in [GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692] So even after a group deletion we can still have pending transactional offset commits for a group that's supposed to be deleted. The group metadata manager will throw an IllegalStateException [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740] while loading group to memory. We will hit this exception on every load to group as long as the hanging transaction never completes. We should delete all pending transactional offset commits (instead of only topic partitions that exist in the offsets cache) when a group is deleted in GroupMetadata.removeOffsets() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2
C0urante commented on code in PR #12281: URL: https://github.com/apache/kafka/pull/12281#discussion_r921456034 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java: ## @@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) { public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) { final String metricsRecorderName = metricsRecorderName(metricsRecorder); -if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) { +final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder); Review Comment: This is a nice improvement in readability, but are we certain it's necessary? I commented on the change to the plugin scanning logic in Connect because I'm familiar with that part of the code base; I don't have the same familiarity with Streams, though. I think it's fine to merge this change, but if this method isn't intended to be invoked concurrently, we should modify the PR title so that the commit message doesn't imply this is a bug fix and instead recognizes it as a cosmetic improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2
C0urante commented on code in PR #12281: URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java: ## @@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) { public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) { final String metricsRecorderName = metricsRecorderName(metricsRecorder); -if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) { +final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder); Review Comment: ~Does this change behavior? Potential concurrency bug aside (which I'm not sure is actually a bug, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change?~ Never mind, had to refresh my understanding of `Map::putIfAbsent`. This does not change behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request, #12406: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets
rajinisivaram opened a new pull request, #12406: URL: https://github.com/apache/kafka/pull/12406 This fixes the options set in commit https://github.com/apache/kafka/commit/beac86f049385932309158c1cb49c8657e53f45f for the old style API. Timeout was not being copied to the new options. The copy is error-prone, so changing to use the provided options directly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2
C0urante commented on code in PR #12281: URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java: ## @@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) { public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) { final String metricsRecorderName = metricsRecorderName(metricsRecorder); -if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) { +final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder); Review Comment: Does this change behavior? Potential concurrency bug aside (which I'm not sure is valid, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change? ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java: ## @@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) { public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) { final String metricsRecorderName = metricsRecorderName(metricsRecorder); -if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) { +final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder); Review Comment: Does this change behavior? Potential concurrency bug aside (which I'm not sure is actually a bug, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566896#comment-17566896 ] Chris Egerton commented on KAFKA-1: --- Hi [~jagsancio]! We can keep it. All of the functional changes for this ticket have been merged as well as integration tests for it; the only remaining PR is one to add system tests: [https://github.com/apache/kafka/pull/11783], which will likely be merged this week or next. If it makes for easier bookkeeping, I can file a separate non-blocker ticket for those system tests and mark this one done. > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Labels: needs-kip > Fix For: 3.3.0 > > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries
[ https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566885#comment-17566885 ] Justine Olshan commented on KAFKA-14074: Thanks for bringing this up [~prestona]. This issue has been brought to my attention for ZK clusters but not KRaft as well. KRaft was built to better handle this issue it seems. I have been looking into ways to mitigate the issue in ZK clusters. > Restarting a broker during re-assignment can leave log directory entries > > > Key: KAFKA-14074 > URL: https://issues.apache.org/jira/browse/KAFKA-14074 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 3.1.0 >Reporter: Adrian Preston >Priority: Major > > Re-starting a broker while replicas are being assigned away from the broker > can result in topic partition directories being left in the broker’s log > directory. This can trigger further problems if such a topic is deleted and > re-created. These problems occur when replicas for the new topic are placed > on a broker that hosts a “stale” topic partition directory of the same name, > causing the on-disk topic partition state held by different brokers in the > cluster to diverge. > We have also been able to re-produce variants this problem using Kafka 2.8 > and 3.1, as well as Kafka built from the head of the apache/kafka repository > (at the time of writing this is commit: > 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to > re-produce this problem with Kafka running in KRaft mode. > A minimal re-create for topic directories being left on disk is as follows: > # Start ZooKeeper and a broker (both using the sample config) > # Create 100 topics: each with 1 partition, and with replication factor 1 > # Add a second broker to the Kafka cluster (with minor edits to the sample > config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}}) > # Issue a re-assignment that moves all of the topic partition replicas from > the first broker to the second broker > # While this re-assignment is taking place shutdown the first broker (you > need to be quick with only two brokers and 100 topics…) > # Wait a few seconds for the re-assignment to stall > # Restart the first broker and wait for the re-assignment to complete and it > to remove any partially deleted topics (e.g. those with a “-delete” suffix). > Inspecting the logs directory for the first broker should show directories > corresponding to topic partitions that are owned by the second broker. These > are not cleaned up when the re-assignment completes, and also remain in the > logs directory even if the first broker is restarted. Deleting the topic > also does not clean up the topic partitions left behind on the first broker - > which leads to a second potential problem. > For topics that have more than one replica: a new topic that has the same > name as a previously deleted topic might have replicas created on a broker > with “stale” topic partition directories. If this happens these topics will > remain in an under-replicated state. > A minimal re-create for this is as follows: > # Create a three node Kafka cluster (backed by ZK) based off the sample > config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2) > # Create 100 topics: each with 1 partition, and with replication factor 2 > # Submit a re-assignment to move all of the topic partition replicas to > kafka-0 and kafka-1, and wait for it to complete > # Submit a re-assignment to move all of the topic partition replicas on > kafka-0 to kafka-2. > # While this re-assignment is taking place shutdown and re-start kafka-0. > # Wait for the re-assignment to complete, and check that there’s unexpected > topic partition directories in kafka-0’s logs directory > # Delete all 100 topics, and re-create 100 new topics with the same name and > configuration as the deleted topics. > In this state kafka-1 and kafka-2 continually generate log messages similar > to: > {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition > test-039-0. This error may be returned transiently when the partition is > being created or deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread)}} > Topics that have had replicas created on kafka-0 are under-replicated with > kafka-0 missing from the ISR list. Performing a rolling restart of each > broker in turn does not resolve the problem, in fact more partitions are > listed as under-replicated, as before kafka-0 is missing from their ISR list. > I also tried to re-create this with Kafka running in Kraft mode, but was > unable to do so. My test configuration
[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries
[ https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566887#comment-17566887 ] Justine Olshan commented on KAFKA-14074: Is this the same issue as https://issues.apache.org/jira/browse/KAFKA-13972? > Restarting a broker during re-assignment can leave log directory entries > > > Key: KAFKA-14074 > URL: https://issues.apache.org/jira/browse/KAFKA-14074 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 3.1.0 >Reporter: Adrian Preston >Priority: Major > > Re-starting a broker while replicas are being assigned away from the broker > can result in topic partition directories being left in the broker’s log > directory. This can trigger further problems if such a topic is deleted and > re-created. These problems occur when replicas for the new topic are placed > on a broker that hosts a “stale” topic partition directory of the same name, > causing the on-disk topic partition state held by different brokers in the > cluster to diverge. > We have also been able to re-produce variants this problem using Kafka 2.8 > and 3.1, as well as Kafka built from the head of the apache/kafka repository > (at the time of writing this is commit: > 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to > re-produce this problem with Kafka running in KRaft mode. > A minimal re-create for topic directories being left on disk is as follows: > # Start ZooKeeper and a broker (both using the sample config) > # Create 100 topics: each with 1 partition, and with replication factor 1 > # Add a second broker to the Kafka cluster (with minor edits to the sample > config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}}) > # Issue a re-assignment that moves all of the topic partition replicas from > the first broker to the second broker > # While this re-assignment is taking place shutdown the first broker (you > need to be quick with only two brokers and 100 topics…) > # Wait a few seconds for the re-assignment to stall > # Restart the first broker and wait for the re-assignment to complete and it > to remove any partially deleted topics (e.g. those with a “-delete” suffix). > Inspecting the logs directory for the first broker should show directories > corresponding to topic partitions that are owned by the second broker. These > are not cleaned up when the re-assignment completes, and also remain in the > logs directory even if the first broker is restarted. Deleting the topic > also does not clean up the topic partitions left behind on the first broker - > which leads to a second potential problem. > For topics that have more than one replica: a new topic that has the same > name as a previously deleted topic might have replicas created on a broker > with “stale” topic partition directories. If this happens these topics will > remain in an under-replicated state. > A minimal re-create for this is as follows: > # Create a three node Kafka cluster (backed by ZK) based off the sample > config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2) > # Create 100 topics: each with 1 partition, and with replication factor 2 > # Submit a re-assignment to move all of the topic partition replicas to > kafka-0 and kafka-1, and wait for it to complete > # Submit a re-assignment to move all of the topic partition replicas on > kafka-0 to kafka-2. > # While this re-assignment is taking place shutdown and re-start kafka-0. > # Wait for the re-assignment to complete, and check that there’s unexpected > topic partition directories in kafka-0’s logs directory > # Delete all 100 topics, and re-create 100 new topics with the same name and > configuration as the deleted topics. > In this state kafka-1 and kafka-2 continually generate log messages similar > to: > {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition > test-039-0. This error may be returned transiently when the partition is > being created or deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread)}} > Topics that have had replicas created on kafka-0 are under-replicated with > kafka-0 missing from the ISR list. Performing a rolling restart of each > broker in turn does not resolve the problem, in fact more partitions are > listed as under-replicated, as before kafka-0 is missing from their ISR list. > I also tried to re-create this with Kafka running in Kraft mode, but was > unable to do so. My test configuration was three brokers configured based on > /config/kraft/server.properties. All three brokers were part of the > controller quorum. Interestingly I see log lines like the fo
[GitHub] [kafka] ocadaruma opened a new pull request, #12405: KAFKA-13572 Fix negative preferred replica imbalanced count metric
ocadaruma opened a new pull request, #12405: URL: https://github.com/apache/kafka/pull/12405 * Currently, `preferredReplicaImbalanceCount` calculation has a race that becomes negative when topic deletion is initiated simultaneously. * Please refer [KAFKA-13572's comment](https://issues.apache.org/jira/browse/KAFKA-13572?focusedCommentId=17566872&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17566872) for the details * This PR addresses the problem by fixing `cleanPreferredReplicaImbalanceMetric` to be called only once per topic-deletion procedure ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13572) Negative value for 'Preferred Replica Imbalance' metric
[ https://issues.apache.org/jira/browse/KAFKA-13572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566872#comment-17566872 ] Haruki Okada commented on KAFKA-13572: -- We experienced similar phenomenon in our Kafka cluster and we found that following scenario can cause negative metric. Let's say there are topic-A, topic-B. # Initiate topic deletion of topic-A ** TopicDeletionManager#enqueueTopicsForDeletion is called with argument Set(topic-A) *** [https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/KafkaController.scala#L1771] # During topic-A's deletion procedure, topic-A's all partitions are marked as Offline (Leader = -1) ** [https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L368] # Before topic-A's deletion procedure completes, initiate topic deletion of topic-B ** Since topic-A's ZK delete-topic node still exists, TopicDeletionManager#enqueueTopicsForDeletion is called with argument Set(topic-A, topic-B) ** ControllerContext#cleanPreferredReplicaImbalanceMetric is called for both topic-A, topic-B *** [https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/ControllerContext.scala#L496] *** Since topic-A is now NoLeader, `!hasPreferredLeader(replicaAssignment, leadershipInfo)` evaluates to true, then `preferredReplicaImbalanceCount` is decremented unexpectedly > Negative value for 'Preferred Replica Imbalance' metric > --- > > Key: KAFKA-13572 > URL: https://issues.apache.org/jira/browse/KAFKA-13572 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Siddharth Ahuja >Priority: Major > Attachments: > kafka_negative_preferred-replica-imbalance-count_jmx_2.JPG > > > A negative value (-822) for the metric - > {{kafka_controller_kafkacontroller_preferredreplicaimbalancecount}} has been > observed - please see the attached screenshot and the output below: > {code:java} > $ curl -s http://localhost:9101/metrics | fgrep > 'kafka_controller_kafkacontroller_preferredreplicaimbalancecount' > # HELP kafka_controller_kafkacontroller_preferredreplicaimbalancecount > Attribute exposed for management (kafka.controller name=PreferredReplicaImbalanceCount><>Value) > # TYPE kafka_controller_kafkacontroller_preferredreplicaimbalancecount gauge > kafka_controller_kafkacontroller_preferredreplicaimbalancecount -822.0 > {code} > The issue has appeared after an operation where the number of partitions for > some topics were increased, and some topics were deleted/created in order to > decrease the number of their partitions. > Ran the following command to check if there is/are any instance/s where the > preferred leader (1st broker in the Replica list) is not the current Leader: > > {code:java} > % grep ".*Topic:.*Partition:.*Leader:.*Replicas:.*Isr:.*Offline:.*" > kafka-topics_describe.out | awk '{print $6 " " $8}' | cut -d "," -f1 | awk > '{print $0, ($1==$2?_:"NOT") "MATCHED"}'|grep NOT | wc -l > 0 > {code} > but could not find any such instances. > {{leader.imbalance.per.broker.percentage=2}} is set for all the brokers in > the cluster which means that we are allowed to have an imbalance of up to 2% > for preferred leaders. This seems to be a valid value, as such, this setting > should not contribute towards a negative metric. > The metric seems to be getting subtracted in the code > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerContext.scala#L474-L503] > , however it is not clear when it can become -ve (i.e. subtracted more than > added) in absence of any comments or debug/trace level logs in the code. > However, one thing is for sure, you either have no imbalance (0) or have > imbalance (> 0), it doesn’t make sense for the metric to be < 0. > FWIW, no other anomalies besides this have been detected. > Considering these metrics get actively monitored, we should look at adding > DEBUG/TRACE logging around the addition/subtraction of these metrics (and > elsewhere where appropriate) to identify any potential issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #12348: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance
dajac commented on PR #12348: URL: https://github.com/apache/kafka/pull/12348#issuecomment-1184491780 @aiquestion Any update on this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12318: MINOR: Support --release in FeatureCommand
dengziming commented on PR #12318: URL: https://github.com/apache/kafka/pull/12318#issuecomment-1184476081 @mumrah Thank you for your reminder, I added `--release` support for describing and added a unit test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on PR #12347: URL: https://github.com/apache/kafka/pull/12347#issuecomment-1184476011 @junrao , PR updated in this commit: https://github.com/apache/kafka/commit/53f7ebe3ec519bd1f4c876f21228dfce7ce42403 . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921131155 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) +} finally { + log.close() +} + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingLogsToRecover` metrics +val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +assertEquals(expectedParams.size, logMetrics.size) + +val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + +val paths = capturedPath.getAllValues +val numRemainingLogs = capturedNumRemainingLogs.getAllValues + +// expected the end value is 0 +logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +expectedParams.foreach { + case (path, totalLogs) => +// make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir +var expectedCurRemainingLogs = totalLogs + 1 +for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { +expectedCurRemainingLogs -= 1 +assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } +} +assertEquals(0, expectedCurRemainingLogs) +} + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingSegmentsToRecover` metrics +val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +// expected each log dir has 2 metrics for each thread +assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size) + +val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture()); + +// expected the end value is 0 +logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +val threadNames = capturedThreadName.getAllValues +val numRemainingSegments = capturedNumRemainingSegments.getAllValues + +expectedParams.foreach { + case (threadName, totalSegments) => +// make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread +var expectedCurRema
[GitHub] [kafka] dengziming commented on a diff in pull request #12318: MINOR: Support --release in FeatureCommand
dengziming commented on code in PR #12318: URL: https://github.com/apache/kafka/pull/12318#discussion_r921176571 ## core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala: ## @@ -107,4 +116,17 @@ class FeatureCommandTest extends BaseRequestTest { assertTrue(downgradeDescribeOutput.contains(expectedOutput)) } } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("kraft")) + def testUpdateFeatureByReleaseVersion(quorum: String): Unit = { +val initialDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe"))) +assertTrue(initialDescribeOutput.contains("Feature: metadata.version\tSupportedMinVersion: 1\tSupportedMaxVersion: 7\tFinalizedVersionLevel: 1")) + +FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "upgrade", "--release", IBP_3_3_IV3.version())) Review Comment: I moved the original `FeatureCommandTest` to `FeatureCommandIntegrationTest` and add a new `FeatureCommandTest` to do these unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File], val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) -val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length +numRemainingLogs.put(dir.getAbsolutePath, new AtomicInteger(logsToLoad.length)) val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { +debug(s"Loading log $logDir") +var log = None: Option[UnifiedLog] +val logLoadStartMs = time.hiResClockMs() try { - debug(s"Loading log $logDir") - - val logLoadStartMs = time.hiResClockMs() - val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, -defaultConfig, topicConfigOverrides) - val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs - val currentNumLoaded = numLogsLoaded.incrementAndGet() - - info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + -s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, +defaultConfig, topicConfigOverrides, numRemainingSegments)) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. +} finally { + val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs + val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath) + val currentNumLoaded = logsToLoad.length - remainingLogs + log match { +case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " + + s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") +case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") Review Comment: The log output might not be in order, ex: ... (11/100 completed in /tmp/kafkaLogs) ... (10/100 completed in /tmp/kafkaLogs) ... (12/100 completed in /tmp/kafkaLogs) but I think that's less important. Otherwise, we need a lock in the `finally` block, which I think it'll affect the log recovery performance. That said, since we can make sure the metric result is in correct order, the log output not in order should be tolerant. WDYT? One thing to add, the log output might be out of order behavior was already there before my change. FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File], val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) -val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length +numRemainingLogs.put(dir.getAbsolutePath, new AtomicInteger(logsToLoad.length)) val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { +debug(s"Loading log $logDir") +var log = None: Option[UnifiedLog] +val logLoadStartMs = time.hiResClockMs() try { - debug(s"Loading log $logDir") - - val logLoadStartMs = time.hiResClockMs() - val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, -defaultConfig, topicConfigOverrides) - val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs - val currentNumLoaded = numLogsLoaded.incrementAndGet() - - info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + -s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, +defaultConfig, topicConfigOverrides, numRemainingSegments)) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. +} finally { + val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs + val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath) + val currentNumLoaded = logsToLoad.length - remainingLogs + log match { +case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " + + s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") +case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)") Review Comment: The log output might not be in order, ex: ... (11/100 completed in /tmp/kafkaLogs) ... (10/100 completed in /tmp/kafkaLogs) ... (12/100 completed in /tmp/kafkaLogs) but I think that's less important. Otherwise, we need a lock in the `finally` block, which I think it'll affect the log recovery performance. That said, since we can make sure the metric result is in correct order, the log output not in order should be tolerant. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921168895 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -307,6 +309,27 @@ class LogManager(logDirs: Seq[File], log } + // factory class for naming the log recovery threads used in metrics + class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory { +val threadNum = new AtomicInteger(0) + +override def newThread(runnable: Runnable): Thread = { + KafkaThread.nonDaemon(logRecoveryThreadName(dirPath, threadNum.getAndIncrement()), runnable) +} + } + + // create a unique log recovery thread name for each log dir as the format: prefix-dirPath-threadNum, ex: "log-recovery-/tmp/kafkaLogs-0" + private def logRecoveryThreadName(dirPath: String, threadNum: Int, prefix: String = "log-recovery"): String = s"$prefix-$dirPath-$threadNum" + + /* + * decrement the number of remaining logs + * @return the number of remaining logs after decremented 1 + */ + private[log] def decNumRemainingLogs(numRemainingLogs: ConcurrentMap[String, AtomicInteger], path: String): Int = { +require(path != null, "path cannot be null to update remaining logs metric.") +numRemainingLogs.get(path).decrementAndGet() Review Comment: Decrement 1 to `AtomicInteger` object when one log loaded, so that we can make sure the metric can reflect the correct number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921159244 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) Review Comment: > how do we enforce the expected number of segments? We can make sure the number of segments because we set "segment.byte=1024", and the dummy record size is 72 bytes each. So that we can confirm how many segments to be created. I've updated the test and add comments. > should we explicitly call log.roll()? No, I don't think we need `log.roll()` here, because we only need log segments filled with records for recovery. Besides, we don't want to update recovery checkpoint to affect the remaining segments metric results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921159244 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) Review Comment: > how do we enforce the expected number of segments? We can make sure the number of segments because we set "segment.byte=1024", and the dummy record size is 72 bytes each. So that we can confirm how many segments to be created. > should we explicitly call log.roll()? No, I don't think we need `log.roll()` here, because we only need log segments filled with records for recovery. Besides, we don't want to update recovery checkpoint to affect the remaining segments metric results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries
Adrian Preston created KAFKA-14074: -- Summary: Restarting a broker during re-assignment can leave log directory entries Key: KAFKA-14074 URL: https://issues.apache.org/jira/browse/KAFKA-14074 Project: Kafka Issue Type: Bug Affects Versions: 3.1.0, 2.8.0 Reporter: Adrian Preston Re-starting a broker while replicas are being assigned away from the broker can result in topic partition directories being left in the broker’s log directory. This can trigger further problems if such a topic is deleted and re-created. These problems occur when replicas for the new topic are placed on a broker that hosts a “stale” topic partition directory of the same name, causing the on-disk topic partition state held by different brokers in the cluster to diverge. We have also been able to re-produce variants this problem using Kafka 2.8 and 3.1, as well as Kafka built from the head of the apache/kafka repository (at the time of writing this is commit: 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to re-produce this problem with Kafka running in KRaft mode. A minimal re-create for topic directories being left on disk is as follows: # Start ZooKeeper and a broker (both using the sample config) # Create 100 topics: each with 1 partition, and with replication factor 1 # Add a second broker to the Kafka cluster (with minor edits to the sample config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}}) # Issue a re-assignment that moves all of the topic partition replicas from the first broker to the second broker # While this re-assignment is taking place shutdown the first broker (you need to be quick with only two brokers and 100 topics…) # Wait a few seconds for the re-assignment to stall # Restart the first broker and wait for the re-assignment to complete and it to remove any partially deleted topics (e.g. those with a “-delete” suffix). Inspecting the logs directory for the first broker should show directories corresponding to topic partitions that are owned by the second broker. These are not cleaned up when the re-assignment completes, and also remain in the logs directory even if the first broker is restarted. Deleting the topic also does not clean up the topic partitions left behind on the first broker - which leads to a second potential problem. For topics that have more than one replica: a new topic that has the same name as a previously deleted topic might have replicas created on a broker with “stale” topic partition directories. If this happens these topics will remain in an under-replicated state. A minimal re-create for this is as follows: # Create a three node Kafka cluster (backed by ZK) based off the sample config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2) # Create 100 topics: each with 1 partition, and with replication factor 2 # Submit a re-assignment to move all of the topic partition replicas to kafka-0 and kafka-1, and wait for it to complete # Submit a re-assignment to move all of the topic partition replicas on kafka-0 to kafka-2. # While this re-assignment is taking place shutdown and re-start kafka-0. # Wait for the re-assignment to complete, and check that there’s unexpected topic partition directories in kafka-0’s logs directory # Delete all 100 topics, and re-create 100 new topics with the same name and configuration as the deleted topics. In this state kafka-1 and kafka-2 continually generate log messages similar to: {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition test-039-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)}} Topics that have had replicas created on kafka-0 are under-replicated with kafka-0 missing from the ISR list. Performing a rolling restart of each broker in turn does not resolve the problem, in fact more partitions are listed as under-replicated, as before kafka-0 is missing from their ISR list. I also tried to re-create this with Kafka running in Kraft mode, but was unable to do so. My test configuration was three brokers configured based on /config/kraft/server.properties. All three brokers were part of the controller quorum. Interestingly I see log lines like the following when re-starting the broker that I stopped mid-reassignment: {{[2022-07-14 13:44:42,705] INFO Found stray log dir Log(dir=/tmp/kraft-2/test-029-0, topicId=DMGA3zxyQqGUfeV6cmkcmg, topic=test-029, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=0): the current replica assignment [I@530d4c70 does not contain the local brokerId 2. (kafka.server.metadata.BrokerMetadataPublisher$)}} With later log lines showing the topic be
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921131155 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) +} finally { + log.close() +} + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingLogsToRecover` metrics +val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +assertEquals(expectedParams.size, logMetrics.size) + +val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + +val paths = capturedPath.getAllValues +val numRemainingLogs = capturedNumRemainingLogs.getAllValues + +// expected the end value is 0 +logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +expectedParams.foreach { + case (path, totalLogs) => +// make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir +var expectedCurRemainingLogs = totalLogs + 1 +for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { +expectedCurRemainingLogs -= 1 +assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } +} +assertEquals(0, expectedCurRemainingLogs) +} + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingSegmentsToRecover` metrics +val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +// expected each log dir has 2 metrics for each thread +assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size) + +val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture()); + +// expected the end value is 0 +logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +val threadNames = capturedThreadName.getAllValues +val numRemainingSegments = capturedNumRemainingSegments.getAllValues + +expectedParams.foreach { + case (threadName, totalSegments) => +// make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread +var expectedCurRema
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921127854 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) +} finally { + log.close() +} + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingLogsToRecover` metrics +val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +assertEquals(expectedParams.size, logMetrics.size) + +val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + +val paths = capturedPath.getAllValues +val numRemainingLogs = capturedNumRemainingLogs.getAllValues + +// expected the end value is 0 +logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +expectedParams.foreach { + case (path, totalLogs) => +// make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir +var expectedCurRemainingLogs = totalLogs + 1 +for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { +expectedCurRemainingLogs -= 1 +assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } +} +assertEquals(0, expectedCurRemainingLogs) +} + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingSegmentsToRecover` metrics +val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +// expected each log dir has 2 metrics for each thread Review Comment: Good catch! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921127366 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) +} finally { + log.close() +} + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingLogsToRecover` metrics +val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +assertEquals(expectedParams.size, logMetrics.size) + +val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), capturedNumRemainingLogs.capture()); + +val paths = capturedPath.getAllValues +val numRemainingLogs = capturedNumRemainingLogs.getAllValues + +// expected the end value is 0 +logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +expectedParams.foreach { + case (path, totalLogs) => +// make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir +var expectedCurRemainingLogs = totalLogs + 1 +for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { +expectedCurRemainingLogs -= 1 +assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } +} +assertEquals(0, expectedCurRemainingLogs) +} + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingSegmentsToRecover` metrics +val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +// expected each log dir has 2 metrics for each thread +assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size) + +val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture()); + +// expected the end value is 0 +logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +val threadNames = capturedThreadName.getAllValues +val numRemainingSegments = capturedNumRemainingSegments.getAllValues + +expectedParams.foreach { + case (threadName, totalSegments) => +// make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread +var expectedCurRema
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r921125134 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -366,25 +392,31 @@ class LogManager(logDirs: Seq[File], val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length +updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, logsToLoad.length) + val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { +debug(s"Loading log $logDir") +var log = None: Option[UnifiedLog] +val logLoadStartMs = time.hiResClockMs() try { - debug(s"Loading log $logDir") - - val logLoadStartMs = time.hiResClockMs() - val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, -defaultConfig, topicConfigOverrides) - val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs - val currentNumLoaded = numLogsLoaded.incrementAndGet() - - info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + -s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") + log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, +defaultConfig, topicConfigOverrides, numRemainingSegments)) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here. +} finally { + val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs + val currentNumLoaded = numLogsLoaded.incrementAndGet() + updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, logsToLoad.length - currentNumLoaded) Review Comment: Good point! I've changed the type in Map, from `Map[String, Int]` to `Map[String, AtomicInteger]`. And we'll decrement one each time update the `numRemainingLogs`, so that we can make sure the metric number will be decremented in correct order. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
rajinisivaram merged PR #10964: URL: https://github.com/apache/kafka/pull/10964 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
rajinisivaram commented on PR #10964: URL: https://github.com/apache/kafka/pull/10964#issuecomment-1184403492 @dajac Thanks for the reviews and changes. Merging to trunk and 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
[ https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566808#comment-17566808 ] Daniel Urban commented on KAFKA-14053: -- I understand that increasing the epoch on the client side is probably violating the contract in the protocol. Refactored my change so the client side timeouts (both delivery and request timeout) will become fatal errors in transactional producers, resulting a last, best-effort epoch bump. > Transactional producer should bump the epoch when a batch encounters delivery > timeout > - > > Key: KAFKA-14053 > URL: https://issues.apache.org/jira/browse/KAFKA-14053 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > When a batch fails due to delivery timeout, it is possible that the batch is > still in-flight. Due to underlying infra issues, it is possible that an > EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight > batch is processed on the leader. This can cause transactional batches to be > appended to the log after the corresponding abort marker. > This can cause the LSO to be infinitely blocked in the partition, or can even > violate processing guarantees, as the out-of-order batch can become part of > the next transaction. > Because of this, the producer should skip aborting the partition, and bump > the epoch to fence the in-flight requests. > > More detail can be found here: > [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-13868: - External issue URL: (was: https://lists.apache.org/thread/8drsbn0hgdhq4g1qgvm9g8pb5t4x42px) > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-13868: - External issue URL: https://lists.apache.org/thread/8drsbn0hgdhq4g1qgvm9g8pb5t4x42px > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
rajinisivaram commented on code in PR #10964: URL: https://github.com/apache/kafka/pull/10964#discussion_r920997757 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws Exception { } } +@Test +public void testBatchedListConsumerGroupOffsets() throws Exception { +Cluster cluster = mockCluster(1, 0); +Time time = new MockTime(); +Map groupSpecs = batchedListConsumerGroupOffsetsSpec(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet())); + +ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); +sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true); + +verifyListOffsetsForMultipleGroups(groupSpecs, result); +} +} + +@Test +public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() throws Exception { Review Comment: @dajac Thank you! I realize now that I was only going through the path where we were looking up the coordinator. But you are right, that isn't sufficient. Your changes look good. Thank you for checking this out and adding the implementation and tests! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #12404: [MINOR] Fix QueryResult Javadocs
lkokhreidze commented on PR #12404: URL: https://github.com/apache/kafka/pull/12404#issuecomment-1184249327 Call for review @vvcephei @cadonna @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze opened a new pull request, #12404: [MINOR] Fix QueryResult Javadocs
lkokhreidze opened a new pull request, #12404: URL: https://github.com/apache/kafka/pull/12404 Fixes the `QueryResult` javadocs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
dajac commented on code in PR #10964: URL: https://github.com/apache/kafka/pull/10964#discussion_r920973675 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws Exception { } } +@Test +public void testBatchedListConsumerGroupOffsets() throws Exception { +Cluster cluster = mockCluster(1, 0); +Time time = new MockTime(); +Map groupSpecs = batchedListConsumerGroupOffsetsSpec(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet())); + +ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); +sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true); + +verifyListOffsetsForMultipleGroups(groupSpecs, result); +} +} + +@Test +public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() throws Exception { Review Comment: @rajinisivaram As I had to build the code to prove my theory, I pushed the commit so you can reuse it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner
ashwinpankaj commented on code in PR #8690: URL: https://github.com/apache/kafka/pull/8690#discussion_r920869174 ## clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java: ## @@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by } private int nextValue(String topic) { -AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { -return new AtomicInteger(0); -}); +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); return counter.getAndIncrement(); } +@Override +public void onNewBatch(String topic, Cluster cluster, int prevPartition) { +// After onNewBatch is called, we will call partition() again. +// So 'rewind' the counter for this topic. +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); +counter.getAndDecrement(); Review Comment: I feel that the fix lies in RecordAccumulator as currently it is always returning `abortForNewBatch`=true from append() for a partition which does not have a Deque created. If a partition does not have a deque , [accumulator.getOrCreateDeque()](https://github.com/apache/kafka/blob/94d4fdeb28b3cd4d474d943448a7ef653eaa145d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L940) simply creates an empty ArrayQueue. When accumulator tries to append a new record, [tryAppend() ](https://github.com/apache/kafka/blob/94d4fdeb28b3cd4d474d943448a7ef653eaa145d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L383) will return null since ProducerBatch has not been created. Here are the sequence of events if key value is not set for record - 1. partitioner.partition() is invoked - partition id for topic is incremented 1. recordaccumulator.append() is invoked with `abortOnNewBatch` arg is set to true. Accumulator is unable to append record to a batch it returns RecordAppendResult with abortForNewBatch set to true. 1. partitioner.onNewBatch() is invoked 1. partitioner.partition() is invoked again - partition id for topic is incremented 1. recordaccumulator.append() is invoked again with `abortOnNewBatch` arg is set to false. This time accumulator allocates a new ProducerBatch and appends the record. Probable fix: In accumulator.getOrCreateDeque() in addition to creating a Deque, we should also initialize an empty ProducerBatch for the topicPartition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics
[ https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566714#comment-17566714 ] Emmanuel Brard commented on KAFKA-14069: I think it might be a bug I don't see any Delete requests being processed from the Kafka cluster log for topic with this pattern "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-". > Allow custom configuration of foreign key join internal topics > -- > > Key: KAFKA-14069 > URL: https://issues.apache.org/jira/browse/KAFKA-14069 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Emmanuel Brard >Priority: Minor > > Internal topic supporting foreign key joins (-subscription-registration-topic > and -subscription-response-topic) are automatically created with_ infinite > retention_ (retention.ms=-1, retention.bytes=-1). > As far as I understand those topics are used for communication between tasks > that are involved in the FK, the intermediate result though is persisted in a > compacted topic (-subscription-store-changelog). > This means, if I understood right, that during normal operation of the stream > application, once a message is read from the registration/subscription topic, > it will not be read again, even in case of recovery (the position in those > topics is committed). > Because we have very large tables being joined this way with very high > changes frequency, we end up with FK internal topics in the order of 1 or 2 > TB. This is complicated to maintain especially in term of disk space. > I was wondering if: > - this infinite retention is really a required configuration and if not > - this infinite retention could be replaced with a configurable one (for > example of 1 week, meaning that I accept that in case of failure I must this > my app within one week) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner
ashwinpankaj commented on code in PR #8690: URL: https://github.com/apache/kafka/pull/8690#discussion_r920869174 ## clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java: ## @@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by } private int nextValue(String topic) { -AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { -return new AtomicInteger(0); -}); +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); return counter.getAndIncrement(); } +@Override +public void onNewBatch(String topic, Cluster cluster, int prevPartition) { +// After onNewBatch is called, we will call partition() again. +// So 'rewind' the counter for this topic. +AtomicInteger counter = topicCounterMap. +computeIfAbsent(topic, k -> new AtomicInteger(0)); +counter.getAndDecrement(); Review Comment: I feel that the fix lies in RecordAccumulator as currently it is always returning `abortForNewBatch`=true from append() for a partition which does not have a Deque created. If a partition does not have a deque , [accumulator.getOrCreateDeque()](https://github.com/confluentinc/ce-kafka/blob/609c9e8a14f4689de268a9b06b27b5cc982a51c8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L677) simply creates an empty ArrayQueue. When accumulator tries to append a new record, [tryAppend() ](https://github.com/confluentinc/ce-kafka/blob/609c9e8a14f4689de268a9b06b27b5cc982a51c8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L264)will return null since ProducerBatch has not been created. Here are the sequence of events if key value is not set for record - 1. partitioner.partition() is invoked - partition id for topic is incremented 1. recordaccumulator.append() is invoked with `abortOnNewBatch` arg is set to true. Accumulator is unable to append record to a batch it returns RecordAppendResult with abortForNewBatch set to true. 1. partitioner.onNewBatch() is invoked 1. partitioner.partition() is invoked again - partition id for topic is incremented 1. recordaccumulator.append() is invoked again with `abortOnNewBatch` arg is set to false. This time accumulator allocates a new ProducerBatch and appends the record. Probable fix: In accumulator.getOrCreateDeque() in addition to creating a Deque, we should also initialize an empty ProducerBatch for the topicPartition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics
[ https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566695#comment-17566695 ] Emmanuel Brard commented on KAFKA-14069: One thing I did not mention yet is that we use version 2.8.0. I guess those API calls are triggered by the streaming application itself, should we run it in debug log mode to see the calls? > Allow custom configuration of foreign key join internal topics > -- > > Key: KAFKA-14069 > URL: https://issues.apache.org/jira/browse/KAFKA-14069 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Emmanuel Brard >Priority: Minor > > Internal topic supporting foreign key joins (-subscription-registration-topic > and -subscription-response-topic) are automatically created with_ infinite > retention_ (retention.ms=-1, retention.bytes=-1). > As far as I understand those topics are used for communication between tasks > that are involved in the FK, the intermediate result though is persisted in a > compacted topic (-subscription-store-changelog). > This means, if I understood right, that during normal operation of the stream > application, once a message is read from the registration/subscription topic, > it will not be read again, even in case of recovery (the position in those > topics is committed). > Because we have very large tables being joined this way with very high > changes frequency, we end up with FK internal topics in the order of 1 or 2 > TB. This is complicated to maintain especially in term of disk space. > I was wondering if: > - this infinite retention is really a required configuration and if not > - this infinite retention could be replaced with a configurable one (for > example of 1 week, meaning that I accept that in case of failure I must this > my app within one week) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14069) Allow custom configuration of foreign key join internal topics
[ https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emmanuel Brard updated KAFKA-14069: --- Affects Version/s: 2.8.0 > Allow custom configuration of foreign key join internal topics > -- > > Key: KAFKA-14069 > URL: https://issues.apache.org/jira/browse/KAFKA-14069 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Emmanuel Brard >Priority: Minor > > Internal topic supporting foreign key joins (-subscription-registration-topic > and -subscription-response-topic) are automatically created with_ infinite > retention_ (retention.ms=-1, retention.bytes=-1). > As far as I understand those topics are used for communication between tasks > that are involved in the FK, the intermediate result though is persisted in a > compacted topic (-subscription-store-changelog). > This means, if I understood right, that during normal operation of the stream > application, once a message is read from the registration/subscription topic, > it will not be read again, even in case of recovery (the position in those > topics is committed). > Because we have very large tables being joined this way with very high > changes frequency, we end up with FK internal topics in the order of 1 or 2 > TB. This is complicated to maintain especially in term of disk space. > I was wondering if: > - this infinite retention is really a required configuration and if not > - this infinite retention could be replaced with a configurable one (for > example of 1 week, meaning that I accept that in case of failure I must this > my app within one week) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
dajac commented on code in PR #10964: URL: https://github.com/apache/kafka/pull/10964#discussion_r920841796 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws Exception { } } +@Test +public void testBatchedListConsumerGroupOffsets() throws Exception { +Cluster cluster = mockCluster(1, 0); +Time time = new MockTime(); +Map groupSpecs = batchedListConsumerGroupOffsetsSpec(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RETRIES_CONFIG, "0")) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE, env.cluster().controller(), groupSpecs.keySet())); + +ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions()); +sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true); + +verifyListOffsetsForMultipleGroups(groupSpecs, result); +} +} + +@Test +public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() throws Exception { Review Comment: @rajinisivaram I was a bit puzzled by this downgrade logic so I spent a bit more time to understand it. In short, it seems that the current implementation does not work. You can make it fail by failing the first offset fetch response with COORDINATOR_LOAD_IN_PROGRESS for instance. When the api driver retries, it groups together all the groups targeting the same coordinator. The current test seems to work because the driver execute steps sequentially. The right approach to implement is likely to make `ListConsumerGroupOffsetsHandler` implements `AdminApiHandler` instead of `Batched`. Then in `buildRequest`, we can look at the `batch` flag in the `lookupStrategy`. Ideally, this should be decoupled from the `lookupStrategy` but that might be enough in our case. Otherwise, we need to add another flag in the `ListConsumerGroupOffsetsHandler`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org