[GitHub] [kafka] yashmayya commented on pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on PR #12984: URL: https://github.com/apache/kafka/pull/12984#issuecomment-1347872598 @C0urante could you please take a look at this whenever you get a chance? I'm sorry to keep pinging you for review requests, but I'm not aware of any other committers currently taking a look at Connect PRs :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14465) java.lang.NumberFormatException: For input string: "index"
jianbin.chen created KAFKA-14465: Summary: java.lang.NumberFormatException: For input string: "index" Key: KAFKA-14465 URL: https://issues.apache.org/jira/browse/KAFKA-14465 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: jianbin.chen {code:java} [2022-12-13 07:12:20,369] WARN [Log partition=fp_sg_flow_copy-1, dir=/home/admin/output/kafka-logs] Found a corrupted index file corresponding to log file /home/admin/output/kafk a-logs/fp_sg_flow_copy-1/0165.log due to Corrupt index found, index file (/home/admin/output/kafka-logs/fp_sg_flow_copy-1/0165.index) has non-zero size but the last offset is 165 which is no greater than the base offset 165.}, recovering segment and rebuilding index files... (kafka.log.Log) [2022-12-13 07:12:20,369] ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "index" (kafka.log.LogManager) [2022-12-13 07:12:20,374] INFO [ProducerStateManager partition=fp_sg_flow_copy-1] Writing producer snapshot at offset 165 (kafka.log.ProducerStateManager) [2022-12-13 07:12:20,378] INFO [Log partition=fp_sg_flow_copy-1, dir=/home/admin/output/kafka-logs] Loading producer state from offset 165 with message format version 2 (kafka.lo g.Log) [2022-12-13 07:12:20,381] INFO [Log partition=fp_sg_flow_copy-1, dir=/home/admin/output/kafka-logs] Completed load of log with 1 segments, log start offset 165 and log end offset 165 in 13 ms (kafka.log.Log) [2022-12-13 07:12:20,389] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.NumberFormatException: For input string: "index" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike.toLong(StringLike.scala:306) at scala.collection.immutable.StringLike.toLong$(StringLike.scala:306) at scala.collection.immutable.StringOps.toLong(StringOps.scala:29) at kafka.log.Log$.offsetFromFile(Log.scala:1846) at kafka.log.Log.$anonfun$loadSegmentFiles$3(Log.scala:331) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) at kafka.log.Log.loadSegmentFiles(Log.scala:320) at kafka.log.Log.loadSegments(Log.scala:403) at kafka.log.Log.(Log.scala:216) at kafka.log.Log$.apply(Log.scala:1748) at kafka.log.LogManager.loadLog(LogManager.scala:265) at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:335) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [2022-12-13 07:12:20,401] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer) {code} When I restart the broker, it becomes like this, I deleted the 000165.index file, after starting it, there are still other files with the same error, please tell me how to fix it and what it is causing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1046722667 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ## @@ -51,8 +52,9 @@ public interface ConfigBackingStore { * Update the configuration for a connector. * @param connector name of the connector * @param properties the connector configuration + * @param callback the callback to be invoked after the put is complete; can be {@code null} if no callback is desired */ -void putConnectorConfig(String connector, Map properties); +void putConnectorConfig(String connector, Map properties, Callback callback); Review Comment: Could potentially add a new overloaded method in `KafkaConfigBackingStore` and avoid touching this interface method although that would require refactoring `AbstractHerder` to make it generic in order to allow `DistributedHerder` to access an instance of `KafkaConfigBackingStore` rather than `ConfigBackingStore`. However, that would be a much more noisy change IMO. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName, final Map } log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); -writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config)); - -// Note that we use the updated connector config despite the fact that we don't have an updated -// snapshot yet. The existing task info should still be accurate. -ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), -// validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG -connectorType(config)); -callback.onCompletion(null, new Created<>(!exists, info)); +Callback cb = (err, result) -> { Review Comment: Another option could be to directly call `callback.onCompletion` in `KafkaConfigBackingStore::putConnectorConfig` itself - i.e. construct the `Created` object there using its in-memory maps directly rather than the snapshot; this would avoid this double callback kinda mechanism but would need some other refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya opened a new pull request, #12984: URL: https://github.com/apache/kafka/pull/12984 - Kafka Connect's `POST /connectors` and `PUT /connectors/{connector}/config` REST APIs internally simply write a message to the Connect cluster's internal config topic (which is then processed asynchronously by the herder). - However, no callback is passed to the producer's send method and there is no error handling in place for producer send failures (see [here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716) / [here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726)). - Consider one such case where the Connect worker's principal doesn't have a WRITE ACL on the cluster's config topic. Now suppose the user submits a connector's configs via one of the above two APIs. The producer send [here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716) / [here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726) won't succeed (due to a TopicAuthorizationException) but the API responses will be `201 Created` success responses anyway. - This is a very poor UX because the connector will actually never be created but the API response indicated success. Furthermore, this failure would only be detectable if TRACE logs are enabled (via [this log)](https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java) making it near impossible for users to debug - This PR uses producer callbacks to surface write failures back to the user via the API response. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #7703: MINOR: Avoid unnecessary tuple allocations in index binary search
ijuma closed pull request #7703: MINOR: Avoid unnecessary tuple allocations in index binary search URL: https://github.com/apache/kafka/pull/7703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #7703: MINOR: Avoid unnecessary tuple allocations in index binary search
ijuma commented on PR #7703: URL: https://github.com/apache/kafka/pull/7703#issuecomment-1347702799 Closing this for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #12983: MINOR: ControllerServer should use the new metadata loader and snapshot generator
cmccabe opened a new pull request, #12983: URL: https://github.com/apache/kafka/pull/12983 This PR introduces the new metadata loader and snapshot generator. For the time being, they are only used by the controller, but a PR for the broker will come soon. The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old loader only supported adding a single publisher.) It also passes along more information about each new image that is published. This information can be found in the LogDeltaManifest and SnapshotManifest classes. The new snapshot generator replaces the previous logic for generating snapshots in QuorumController.java and associated classes. The new generator is intended to be shared between the broker and the controller, so it is decoupled from both. There are a few small changes to the old snapshot generator in this PR. Specifically, we move the batch processing time and batch size metrics out of BrokerMetadataListener.scala and into BrokerServerMetrics.scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12975: MINOR; Improve high watermark log messages
jsancio merged PR #12975: URL: https://github.com/apache/kafka/pull/12975 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12975: MINOR; Improve high watermark log messages
jsancio commented on PR #12975: URL: https://github.com/apache/kafka/pull/12975#issuecomment-1347573437 Merging. Unrelated test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Affects Version/s: (was: 3.3.2) > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.4.0, 3.3.1 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 3.3.2, 3.5.0 > > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Fix Version/s: 3.3.2 > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 3.3.2, 3.5.0 > > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`
jsancio commented on code in PR #12981: URL: https://github.com/apache/kafka/pull/12981#discussion_r1046477051 ## raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java: ## @@ -32,7 +32,7 @@ * topic partition from offset 0 up to but not including the end offset in the snapshot * id. * - * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long) + * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(OffsetAndEpoch, long) Review Comment: Maybe this should point to `RaftClient.createSnapshot` since that is what we documented. ``` * @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch, long) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14271) Topic recreation fails in KRaft mode when topic contains collidable characters
[ https://issues.apache.org/jira/browse/KAFKA-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeffrey Tolar resolved KAFKA-14271. --- Fix Version/s: 3.4.0 3.3.2 Resolution: Duplicate Haven't tested the patch yet, but KAFKA-14337 appears to be the same as this issue; that was fixed with https://github.com/apache/kafka/pull/12790 > Topic recreation fails in KRaft mode when topic contains collidable characters > -- > > Key: KAFKA-14271 > URL: https://issues.apache.org/jira/browse/KAFKA-14271 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jeffrey Tolar >Priority: Major > Fix For: 3.4.0, 3.3.2 > > Attachments: topic.with.dots.log, topicwithoutdots.log > > > We recently updated one of our clusters from 3.2.0 to 3.3.0 (primarily to get > the fix for KAFKA-13909). This cluster is running KRaft mode. > This is a cluster used for some integration tests - each test deletes the > topics it uses before the test to ensure a clean slate for the test; the > brokers get restarted in-between tests, but the broker data isn't deleted. > With 3.3.0, this semi-crashes Kafka. The brokers stay running, but the topic > creation fails: > {noformat} > [2022-09-30 17:17:59,216] WARN [Controller 1] createTopics: failed with > unknown server exception NoSuchElementException at epoch 1 in 601 us. > Renouncing leadership and reverting to the last committed offset 18. > (org.apache.kafka.controller.QuorumController) > java.util.NoSuchElementException > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167) > at > org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139) > at > org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120) > at > org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799) > at > org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567) > at > org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} > This appears to be because our topic names contain {{.}}'s. Here's a quick > reproducer script: > {noformat} > #!/bin/bash > VERSION=3.3.0 > TOPIC=$1 > set -x > rm -rf -- kafka_2.13-${VERSION} kafka_2.13-${VERSION}.tgz > /tmp/kraft-combined-logs > trap 'kill -- "-$$" && wait' EXIT > curl -O https://dlcdn.apache.org/kafka/$VERSION/kafka_2.13-${VERSION}.tgz > tar -xzf kafka_2.13-${VERSION}.tgz > cd kafka_2.13-${VERSION} > id=$(./bin/kafka-storage.sh random-uuid) > ./bin/kafka-storage.sh format -t $id -c ./config/kraft/server.properties > ./bin/kafka-server-start.sh config/kraft/server.properties > broker.log 2>&1 & > sleep 1 > ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server > localhost:9092 > ./bin/kafka-topics.sh --delete --topic "$TOPIC" --bootstrap-server > localhost:9092 > ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server > localhost:9092 > sleep 1 > {noformat} > Running {{./test-kafka.sh topic.with.dots}} exhibits the failure; using > {{topicwithoutdots}} works as expected. > I'll attach the broker logs from each run. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
jolshan commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1046506543 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") Review Comment: Sure we can do that I suppose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
jolshan commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1046505868 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -236,7 +236,7 @@ class RPCProducerIdManager(brokerId: Int, private[transaction] def handleTimeout(): Unit = { warn("Timed out when requesting AllocateProducerIds from the controller.") requestInFlight.set(false) -nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception)) + nextProducerIdBlock.put(Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception)) Review Comment: I think it so we return the error immediately instead of waiting for the poll. We also only send one request -- once we get this response we will drop to the block were we poll and get this error response out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`
jsancio commented on code in PR #12981: URL: https://github.com/apache/kafka/pull/12981#discussion_r1046477051 ## raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java: ## @@ -32,7 +32,7 @@ * topic partition from offset 0 up to but not including the end offset in the snapshot * id. * - * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long) + * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(OffsetAndEpoch, long) Review Comment: Maybe this to point to `RaftClient.createSnapshot` since that is what we documented. ``` * @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch, long) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent opened a new pull request, #12982: MINOR: Fix UT in KAFKA-14372
CalvinConfluent opened a new pull request, #12982: URL: https://github.com/apache/kafka/pull/12982 Even if the follower is the ISR, the follower will be excluded because the logEndOffset (-1) is less than the fetchOffset (0) ``` if (partition.inSyncReplicaIds.contains(replica.brokerId) && replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) ``` https://github.com/CalvinConfluent/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1252 So the test needs to update its logEndOffset by appending one message. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji closed pull request #12963: MINOR: More consistent handling of snapshot IDs
hachikuji closed pull request #12963: MINOR: More consistent handling of snapshot IDs URL: https://github.com/apache/kafka/pull/12963 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12963: MINOR: More consistent handling of snapshot IDs
hachikuji commented on PR #12963: URL: https://github.com/apache/kafka/pull/12963#issuecomment-1347439806 Closing this patch since the issue with MetadataImage.EMPTY was addressed in https://github.com/apache/kafka/commit/b2dea17041157ceee741041d23783ff993b88ef1. I've opened a separate patch for the refactor of `RaftClient.createSnapshot`: https://github.com/apache/kafka/pull/12981. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`
hachikuji opened a new pull request, #12981: URL: https://github.com/apache/kafka/pull/12981 Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046455192 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -355,6 +375,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea case id if id < 0 => None case id => Some(id) } + val kraftControllerIdOpt = updateMetadataRequest.kraftControllerId() match { +case id if id < 0 => None +case id => Some(id) + } + if (controllerIdOpt.nonEmpty && kraftControllerIdOpt.nonEmpty) { Review Comment: This isn't an error, right? It's possible for both of them to be -1... Like sometimes we are in the middle of electing a new controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046452882 ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -60,7 +61,8 @@ trait ZkFinalizedFeatureCache { * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures) +class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, Review Comment: can we do an ``` abc( a b c ) { } ``` thing here? Sorry, these long scala declarations are ugly otherwise... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046452203 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -544,6 +548,7 @@ class KafkaServer( private def controlledShutdown(): Unit = { val socketTimeoutMs = config.controllerSocketTimeoutMs +// TODO (KAFKA-14447): Handle controlled shutdown for zkBroker when we have KRaft controller. Review Comment: Can we just log a message and `return` from this function early, when we are in this situation? Currently, it looks like we just get stuck and time out, which is not great -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046450807 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -262,8 +263,11 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) -metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures) -val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) +if (config.migrationEnabled) { + kraftControllerNodes = RaftConfig.voterConnectionsToNodes(RaftConfig.parseVoterConnections(config.quorumVoters)).asScala +} +metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures, kraftControllerNodes) Review Comment: can we put each parameter on its own line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046450254 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp) + def enableZkApiForwarding: Boolean = migrationEnabled && interBrokerProtocolVersion.isApiForwardingEnabled Review Comment: to clarify, it's fine to do this static check for now but it should be done in `Kafka.scala` and we should comment that it's OK because IBP is static in ZK mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046449684 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp) + def enableZkApiForwarding: Boolean = migrationEnabled && interBrokerProtocolVersion.isApiForwardingEnabled Review Comment: please don't add a reference to `KafkaConfig.interBrokerProtocolVersion` here. That is going away since we are moving away from statically configuring IBP (or indeed calling it IBP) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447468 ## clients/src/main/resources/common/message/LeaderAndIsrRequest.json: ## @@ -36,7 +36,7 @@ "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The current controller ID." }, -{ "name": "KRaftControllerId", "type": "int32", "versions": "7+", "entityType": "brokerId", "default": "-1", +{ "name": "KRaftControllerId", "type": "int32", "versions": "7+", "default": "-1", "entityType": "brokerId", "default": "-1", Review Comment: this already has a default value of -1. It doesn't need to be stated twice... ## clients/src/main/resources/common/message/StopReplicaRequest.json: ## @@ -31,7 +31,7 @@ "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, -{ "name": "KRaftControllerId", "type": "int32", "versions": "4+", "entityType": "brokerId", "default": "-1", +{ "name": "KRaftControllerId", "type": "int32", "versions": "4+", "default": "-1", "entityType": "brokerId", "default": "-1", Review Comment: this already has a default value of -1. It doesn't need to be stated twice... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447181 ## clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java: ## @@ -51,7 +51,16 @@ public static class Builder extends AbstractControlRequest.Builder partitionStates, List liveBrokers, Map topicIds) { -super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch); +super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, false); Review Comment: same, define this in terms of the other ctor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447035 ## clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java: ## @@ -45,14 +45,23 @@ public static class Builder extends AbstractControlRequest.Builder topicStates) { -super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch); +super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch, false); Review Comment: this is another case where we should define this in terms of the other constructor to avoid duplication ``` this(version, controllerId, controllerEpoch, brokerEpoch, deletePartitions, topicStates, false) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046446539 ## clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java: ## @@ -51,7 +51,16 @@ public static class Builder extends AbstractControlRequest.Builder partitionStates, Map topicIds, Collection liveLeaders) { -super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch); +super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, false); Review Comment: let's define this in terms of the other constructor to avoid duplicating the below code. something like: ``` this(version, controllerId, controllerEpoch, brokerEpoch, false) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
mumrah commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1046370590 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1317,14 +1316,23 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","), brokers.mkString(","), request.header.correlationId, request.header.clientId)) +val controllerId = { + metadataCache match { +case cache: KRaftMetadataCache => cache.getControllerId.map(_.id) +case cache => cache.getControllerId.flatMap { Review Comment: Can we add the class for this case as well? ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3332,13 +3340,21 @@ class KafkaApis(val requestChannel: RequestChannel, } val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName) -val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID) +val controllerId = { + metadataCache match { +case cache: KRaftMetadataCache => cache.getControllerId.map(_.id) +case cache => cache.getControllerId.flatMap { Review Comment: And here ## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ## @@ -315,7 +324,18 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea }.getOrElse(Map.empty[Int, Node]) } - def getControllerId: Option[Int] = metadataSnapshot.controllerId + def getControllerId: Option[CachedControllerId] = { +val snapshot = metadataSnapshot +snapshot.kraftControllerId match { + case Some(id) => Some(KRaftCachedControllerId(id)) + case None => snapshot.controllerId.map(ZkCachedControllerId) +} + } + + def getRandomAliveBrokerId: Option[Int] = { Review Comment: errant whitespace -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-14392. --- Fix Version/s: 3.5.0 Resolution: Fixed > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 3.5.0 > > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Affects Version/s: 3.3.2 > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Affects Version/s: 3.4.0 > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.4.0, 3.3.1 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Component/s: kraft > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.0, 3.3.1 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Affects Version/s: (was: 3.4.0) > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.3.0, 3.3.1 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-14392: -- Affects Version/s: 3.3.1 3.3.0 3.4.0 > KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms > -- > > Key: KAFKA-14392 > URL: https://issues.apache.org/jira/browse/KAFKA-14392 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.3.0, 3.4.0, 3.3.1 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > > KRaft brokers maintain their liveness in the cluster by sending > BROKER_HEARTBEAT requests to the active controller; the active controller > fences a broker if it doesn't receive a heartbeat request from that broker > within the period defined by `broker.session.timeout.ms`. The broker should > use a request timeout for its BROKER_HEARTBEAT requests that is not larger > than the session timeout being used by the controller; doing so creates the > possibility that upon controller failover the broker might not cancel an > existing heartbeat request in time and then subsequently heartbeat to the new > controller to maintain an uninterrupted session in the cluster. In other > words, a failure of the active controller could result in under-replicated > (or under-min ISR) partitions simply due to a delay in brokers heartbeating > to the new controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
hachikuji commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1046275111 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") Review Comment: I was just trying to figure out if we wanted the "unexpected error" message to be logged or not. Usually we try to catch errors in the handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side
hachikuji commented on code in PR #12968: URL: https://github.com/apache/kafka/pull/12968#discussion_r1046262250 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") Review Comment: It would be helpful to have a comment here since it's probably not obvious why we use this error instead of `REQUEST_TIMED_OUT`. ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -236,7 +236,7 @@ class RPCProducerIdManager(brokerId: Int, private[transaction] def handleTimeout(): Unit = { warn("Timed out when requesting AllocateProducerIds from the controller.") requestInFlight.set(false) -nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception)) + nextProducerIdBlock.put(Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception)) Review Comment: I wonder why we do this. If we don't send anything back, we still get the timeout after `maxWaitMs`. The odd thing is that the timeout might occur in the middle of the wait for `maxWaitMs`, so the actual time we wait might be less than that. I wonder if we can just get rid of this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12975: MINOR; Improve high watermark log messages
jsancio commented on code in PR #12975: URL: https://github.com/apache/kafka/pull/12975#discussion_r1046207835 ## raft/src/main/java/org/apache/kafka/raft/FollowerState.java: ## @@ -169,4 +184,26 @@ public void close() { fetchingSnapshot.get().close(); } } + +private void logHighWatermarkUpdate( +Optional oldHighWatermark, +Optional newHighWatermark +) { +if (!oldHighWatermark.equals(newHighWatermark)) { +if (oldHighWatermark.isPresent()) { +log.trace( +"High watermark set to {} from {} for epoch {}", +newHighWatermark, +oldHighWatermark.get(), +epoch +); +} else { +log.info( +"High watermark set to {} for the firs time for epoch {}", Review Comment: Fixed. Thanks for the review @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.
[ https://issues.apache.org/jira/browse/KAFKA-14461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14461: Component/s: streams unit tests > StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to > check for active partitions seems brittle. > -- > > Key: KAFKA-14461 > URL: https://issues.apache.org/jira/browse/KAFKA-14461 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > {noformat} > StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores {noformat} > has a logic to figure out active partitions: > > > {code:java} > final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % > 2) == 1;{code} > > > This is very brittle as when a new test gets added, this check would need to > be changed to `==0`. It's a hassle to change it everytime with a new test > added. Should look to improve this. > Also, this test relies on junit4 annotations which can be migrated to Junit 5 > so that we can use @BeforeAll to set up and @AfterAll to shutdown the cluster > instead of the current way where it's being done before/after every test. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe merged pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter
cmccabe merged PR #12964: URL: https://github.com/apache/kafka/pull/12964 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #12980: KAFKA-14464: Make resource leaks for MM2 resources more difficult
gharris1727 opened a new pull request, #12980: URL: https://github.com/apache/kafka/pull/12980 For all AutoCloseable resources created by MM2 connectors & tasks, force their instantiation to take place within a BackgroundResources instance, which can be closed when the connector/task stops. Signed-off-by: Greg Harris This was split out of #12955 for scope reasons. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14323) KRaft broker time based snapshots
[ https://issues.apache.org/jira/browse/KAFKA-14323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14323: --- Fix Version/s: (was: 3.4.0) > KRaft broker time based snapshots > - > > Key: KAFKA-14323 > URL: https://issues.apache.org/jira/browse/KAFKA-14323 > Project: Kafka > Issue Type: New Feature >Reporter: José Armando García Sancio >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14323) KRaft broker time based snapshots
[ https://issues.apache.org/jira/browse/KAFKA-14323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14323: --- Fix Version/s: 3.5.0 > KRaft broker time based snapshots > - > > Key: KAFKA-14323 > URL: https://issues.apache.org/jira/browse/KAFKA-14323 > Project: Kafka > Issue Type: New Feature >Reporter: José Armando García Sancio >Assignee: Colin McCabe >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14403) Snapshot failure metrics
[ https://issues.apache.org/jira/browse/KAFKA-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14403: --- Fix Version/s: 3.5.0 > Snapshot failure metrics > > > Key: KAFKA-14403 > URL: https://issues.apache.org/jira/browse/KAFKA-14403 > Project: Kafka > Issue Type: New Feature >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.5.0 > > > Implement the following metrics: > Controller: > kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors > Incremented anytime the controller fails to generate a snapshot. Reset to > zero anytime the controller restarts or a snapshot is successfully generated. > Broker: > kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors > Incremented anytime the broker fails to generate a snapshot. Reset to zero > anytime the broker restarts or a snapshot is successfully generated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14464) Refactor MM2 connectors to make resource leaks from code defects less likely
Greg Harris created KAFKA-14464: --- Summary: Refactor MM2 connectors to make resource leaks from code defects less likely Key: KAFKA-14464 URL: https://issues.apache.org/jira/browse/KAFKA-14464 Project: Kafka Issue Type: Task Components: mirrormaker Reporter: Greg Harris Assignee: Greg Harris The MirrorMaker2 connectors instantiate many closeable resources during start() which must be closed in the corresponding stop() method. If a new resource is created, a code defect may be introduced by not updating the stop() method to close the resource, and leaking that resource after the lifetime of the connector. We should refactor these connectors in some fashion which makes these defects less likely, and explicitly manage the lifetime of these background resources rather than relying on developer & reviewer due diligence. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
cadonna commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -42,77 +40,56 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; - private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); -@Before -public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processorContext = ctrl.createMock(InternalProcessorContext.class); - -stateManager = ctrl.createMock(ProcessorStateManager.class); -stateDirectory = ctrl.createMock(StateDirectory.class); -} - @Test public void testRegisterStateStoreWhenTopologyEmpty() { -expect(topology.stateStores()).andReturn(emptyList()); - -ctrl.checkOrder(true); -ctrl.replay(); +when(topology.stateStores()).thenReturn(emptyList()); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - -ctrl.verify(); } @Test public void testRegisterStateStoreFailToLockStateDirectory() { -expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); - -expect(stateManager.taskId()).andReturn(taskId); +when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); Review Comment: ```suggestion ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -42,77 +40,56 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; - private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); -@Before -public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processorContext = ctrl.createMock(InternalProcessorContext.class); - -stateManager = ctrl.createMock(ProcessorStateManager.class); -stateDirectory = ctrl.createMock(StateDirectory.class); -} - @Test public void testRegisterStateStoreWhenT
[jira] [Created] (KAFKA-14463) ConnectorClientConfigOverridePolicy is not closed at worker shutdown
Greg Harris created KAFKA-14463: --- Summary: ConnectorClientConfigOverridePolicy is not closed at worker shutdown Key: KAFKA-14463 URL: https://issues.apache.org/jira/browse/KAFKA-14463 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Greg Harris The ConnectorClientConfigOverridePolicy is marked AutoCloseable, but is never closed by the worker on shutdown. This is currently not a critical issue, as all known implementations of the policy have a no-op close. But a possible implementation which does instantiate background resources that must be closed in close() would leak those resources in a test environment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #12973: MINOR: Change KRaft controllership claiming algorithm
mumrah commented on code in PR #12973: URL: https://github.com/apache/kafka/pull/12973#discussion_r1046093236 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -166,60 +166,74 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * the migration. * * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm - * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method, - * the conditional update on /controller_epoch fails which causes the whole multi-op transaction to fail. + * uses a conditional update on the /controller and /controller_epoch znodes. + * + * If a new controller is registered concurrently with this registration, one of the two will fail the CAS + * operation on /controller_epoch. For KRaft, we have an extra guard against the registered KRaft epoch going + * backwards. If a KRaft controller had previously registered, an additional CAS operation is done on the /controller + * ZNode to ensure that the KRaft epoch being registered is newer. * * @param kraftControllerId ID of the KRaft controller node * @param kraftControllerEpoch Epoch of the KRaft controller node - * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller. + * @return An optional of the written epoch and new zkVersion of /controller_epoch. None if we could not register the KRaft controller. */ - def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = { + def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[(Int, Int)] = { val timestamp = time.milliseconds() val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion)) -val controllerOpt = getControllerId -val controllerEpochToStore = kraftControllerEpoch + 1000 // TODO Remove this after KAFKA-14436 +val controllerOpt = getControllerRegistration + +// If we have a KRaft epoch registered in /controller, and it is not _older_ than the requested epoch, throw an error. +controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk => + if (kraftEpochInZk >= kraftControllerEpoch) { +throw new ControllerMovedException(s"Cannot register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch " + + s"as the current controller register in ZK has the same or newer epoch $kraftEpochInZk.") + } +} + curEpochOpt match { case None => throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " + s"since there is no ZK controller epoch present.") case Some((curEpoch: Int, curEpochZk: Int)) => -if (curEpoch >= controllerEpochToStore) { - // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK - throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " + -s"in ZK since its epoch ${controllerEpochToStore} is not higher than the current ZK epoch ${curEpoch}.") +// TODO KAFKA-14436 Increase the KRaft epoch to be higher than the ZK epoch +val newControllerEpoch = if (kraftControllerEpoch >= curEpoch) { + kraftControllerEpoch +} else { + curEpoch + 1 } -val response = if (controllerOpt.isDefined) { - info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " + -s"controller with epoch $controllerEpochToStore. The previous controller was ${controllerOpt.get}.") - retryRequestUntilConnected( -MultiRequest(Seq( - SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk), - DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion), - CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), -defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) - ) -} else { - info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " + -s"controller with epoch $controllerEpochToStore. There was no active controller.") - retryRequestUntilConnected( -MultiRequest(Seq( - SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk), - CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), -defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) - ) +val response = controllerOpt match { + case Some(controller) =
[jira] [Created] (KAFKA-14462) New Group Coordinator State Machine
David Jacot created KAFKA-14462: --- Summary: New Group Coordinator State Machine Key: KAFKA-14462 URL: https://issues.apache.org/jira/browse/KAFKA-14462 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac closed pull request #12969: MINOR: Small refactor in ApiVersionManager
dajac closed pull request #12969: MINOR: Small refactor in ApiVersionManager URL: https://github.com/apache/kafka/pull/12969 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646033#comment-17646033 ] Mikael commented on KAFKA-14419: Thanks, I'll try that out. I did see the same symptom but with a different sequence of log events the other day (and this was with my workaround in place so that was apparently not enough): {noformat} 2022-12-11 14:58:16,429 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Request joining group due to: group is already rebalancing 2022-12-11 14:58:16,429 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] (Re-)joining group 2022-12-11 14:58:16,760 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Successfully joined group with generation Generation{generationId=12752, memberId='messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer-2bacd65f-cfb5-4ed9-9705-1db3e7dbfc6c', protocol='stream'} 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Successfully synced group in generation Generation{generationId=12752, memberId='messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer-2bacd65f-cfb5-4ed9-9705-1db3e7dbfc6c', protocol='stream'} 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Updating assignment with 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.input-4], userDataSize=52) 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer] No followup rebalance was requested, resetting the rebalance schedule. 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] Handle new assignment with: 2022-12-11 14:58:16,778 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer, groupId=messages.xms.mt] Adding newly assigned partitions: 2022-12-11 14:58:16,779 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] State transition from RUNNING to PARTITIONS_ASSIGNED 2022-12-11 14:58:16,852 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] Restoration took 72 ms for all tasks [1_4] 2022-12-11 14:58:16,852 INFO [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] State transition from PARTITIONS_ASSIGNED to RUNNING {noformat} > Same message consumed again by the same stream task after partition is lost > and reassigned > -- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: s
[GitHub] [kafka] chia7712 opened a new pull request, #12979: MINOR: remove "is-future" from metrics tags after replace current log…
chia7712 opened a new pull request, #12979: URL: https://github.com/apache/kafka/pull/12979 we don't remove "is-future=true" tag from future log after the future log becomes "current" log. It causes two potential issues: 1. the metrics monitors can't get metrics of Log if they don't trace the property "is-future=true". 2. all Log metrics of specify partition get removed if the partition is moved to another folder again. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14379) consumer should refresh preferred read replica on update metadata
[ https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14379. - Fix Version/s: 3.3.2 Resolution: Fixed > consumer should refresh preferred read replica on update metadata > - > > Key: KAFKA-14379 > URL: https://issues.apache.org/jira/browse/KAFKA-14379 > Project: Kafka > Issue Type: Bug >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > The consumer (fetcher) refreshes the preferred read replica only on three > conditions: > # the consumer receives an OFFSET_OUT_OF_RANGE error > # the follower does not exist in the client's metadata (i.e., offline) > # after metadata.max.age.ms (5 min default) > For other errors, it will continue to reach to the possibly unavailable > follower and only after 5 minutes will it refresh the preferred read replica > and go back to the leader. > A specific example is when a partition is reassigned. the consumer will get > NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred > read replica will not be refreshed as the follower is still online. it will > continue to reach out to the old follower until the preferred read replica > expires. > the consumer can instead refresh its preferred read replica whenever it makes > a metadata update request. so when the consumer receives i.e. > NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without > waiting for the expiration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata
dajac commented on PR #12956: URL: https://github.com/apache/kafka/pull/12956#issuecomment-1346125696 Merged to trunk, 3.4 and 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata
dajac merged PR #12956: URL: https://github.com/apache/kafka/pull/12956 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata
dajac commented on code in PR #12956: URL: https://github.com/apache/kafka/pull/12956#discussion_r1045547158 ## clients/src/main/java/org/apache/kafka/common/Cluster.java: ## @@ -253,7 +253,11 @@ public Node nodeById(int id) { public Optional nodeIfOnline(TopicPartition partition, int id) { Node node = nodeById(id); PartitionInfo partitionInfo = partition(partition); -if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) { + +if (node != null && partitionInfo != null && +!Arrays.asList(partitionInfo.offlineReplicas()).contains(node) && Review Comment: Thanks for the clarification. It is actually not too bad like this. We can merge like this and follow-up if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12978: MINOR: Small refactor in KafkaApis.handleHeartbeatRequest
dajac opened a new pull request, #12978: URL: https://github.com/apache/kafka/pull/12978 This is a small follow-up to https://github.com/apache/kafka/pull/12848. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic
yashmayya commented on code in PR #12947: URL: https://github.com/apache/kafka/pull/12947#discussion_r1045537542 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ## @@ -21,41 +21,51 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore; -import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.Map; /** * - * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not - * distributed. Instead, all the normal Connect machinery works within a single process. This is - * useful for ad hoc, small, or experimental jobs. - * - * - * By default, no job configs or offset data is persistent. You can make jobs persistent and - * fault tolerant by overriding the settings to use file storage for both. + * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not + * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development + * and testing Kafka Connect on a local machine. * */ -public class ConnectStandalone { +public class ConnectStandalone extends AbstractConnectCli { private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class); +@Override +protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins, + ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, + RestServer restServer, RestClient restClient) { + +OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore(); +offsetBackingStore.configure(config); + +Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore, +connectorClientConfigOverridePolicy); + +return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy); +} + +@Override +protected StandaloneConfig createConfig(Map workerProps) { +return new StandaloneConfig(workerProps); +} + public static void main(String[] args) { Review Comment: Nice! This looks much cleaner and I've made the changes (largely the same with minor tweaks). The only small qualm that I have is that the integration test framework also uses `startConnect` and it's a bit odd as it's initializing a CLI class with no args (since it passes the worker properties map directly to `startConnect`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface
dajac commented on code in PR #12858: URL: https://github.com/apache/kafka/pull/12858#discussion_r1045524817 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -1992,6 +1992,186 @@ class KafkaApisTest { testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION) } + @Test + def testHandleDeleteGroups(): Unit = { +val deleteGroupsRequest = new DeleteGroupsRequestData() + .setGroupsNames(List( +"group-1", +"group-2", +"group-3" + ).asJava) + +val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build()) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + ApiKeys.DELETE_GROUPS.latestVersion, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier +) + +val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]() +when(newGroupCoordinator.deleteGroups( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava) +)).thenReturn(future) + +createKafkaApis().handleDeleteGroupsRequest( + requestChannelRequest, + RequestLocal.NoCaching +) + +val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List( Review Comment: Thanks for your comment. I played a bit with your suggestion and I found that using a `Map` is not convenient here because the order matters. We could use an `LinkedHashMap` but that makes things less readable in my opinion. I think that I will stick to the current approach which is not that bad. We do this everywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
dajac commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1044655133 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -1933,11 +1943,79 @@ private Map topicPartitionTags(TopicPartition tp) { } } +// Visible for testing +void maybeCloseFetchSessions(final Timer timer) { +final Cluster cluster = metadata.fetch(); +final List> requestFutures = new ArrayList<>(); +for (final Map.Entry entry : sessionHandlers.entrySet()) { +final FetchSessionHandler sessionHandler = entry.getValue(); +// set the session handler to notify close. This will set the next metadata request to send close message. +sessionHandler.notifyClose(); + +final int sessionId = sessionHandler.sessionId(); +final Integer fetchTargetNodeId = entry.getKey(); +// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will +// skip sending the close request. +final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); +if (fetchTarget == null || client.isUnavailable(fetchTarget)) { +log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); +continue; +} + +final RequestFuture responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); +responseFuture.addListener(new RequestFutureListener() { +@Override +public void onSuccess(ClientResponse value) { +log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); +} + +@Override +public void onFailure(RuntimeException e) { +log.debug("Unable to a close message for fetch session: {} to node: {}. " + +"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); +} +}); + +requestFutures.add(responseFuture); +} + +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +do { +client.poll(timer, null, true); +} while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)); + +if (!requestFutures.stream().allMatch(RequestFuture::isDone)) { +// we ran out of time before completing all futures. It is ok since we don't want to block the shutdown +// here. +log.debug("All requests couldn't be sent in the specific timeout period {}ms. " + +"This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + +"KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); +} +} + +public void close(final Timer timer) { +if (!isClosed.compareAndSet(false, true)) { +log.info("Fetcher {} is already closed.", this); +return; +} + +// Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence, +// it is necessary to acquire a lock on the fetcher instance before modifying the states. +synchronized (Fetcher.this) { +// we do not need to re-enable wakeups since we are closing already +client.disableWakeups(); Review Comment: Could you elaborate on why we need this here? It is something that we did not have before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org