[GitHub] [kafka] yashmayya commented on pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-12 Thread GitBox


yashmayya commented on PR #12984:
URL: https://github.com/apache/kafka/pull/12984#issuecomment-1347872598

   @C0urante could you please take a look at this whenever you get a chance? 
I'm sorry to keep pinging you for review requests, but I'm not aware of any 
other committers currently taking a look at Connect PRs :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14465) java.lang.NumberFormatException: For input string: "index"

2022-12-12 Thread jianbin.chen (Jira)
jianbin.chen created KAFKA-14465:


 Summary: java.lang.NumberFormatException: For input string: 
"index" 
 Key: KAFKA-14465
 URL: https://issues.apache.org/jira/browse/KAFKA-14465
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: jianbin.chen


{code:java}
[2022-12-13 07:12:20,369] WARN [Log partition=fp_sg_flow_copy-1, 
dir=/home/admin/output/kafka-logs] Found a corrupted index file corresponding 
to log file /home/admin/output/kafk
a-logs/fp_sg_flow_copy-1/0165.log due to Corrupt index found, 
index file 
(/home/admin/output/kafka-logs/fp_sg_flow_copy-1/0165.index) 
has non-zero
 size but the last offset is 165 which is no greater than the base offset 
165.}, recovering segment and rebuilding index files... (kafka.log.Log)
[2022-12-13 07:12:20,369] ERROR There was an error in one of the threads during 
logs loading: java.lang.NumberFormatException: For input string: "index" 
(kafka.log.LogManager)
[2022-12-13 07:12:20,374] INFO [ProducerStateManager 
partition=fp_sg_flow_copy-1] Writing producer snapshot at offset 165 
(kafka.log.ProducerStateManager)
[2022-12-13 07:12:20,378] INFO [Log partition=fp_sg_flow_copy-1, 
dir=/home/admin/output/kafka-logs] Loading producer state from offset 165 with 
message format version 2 (kafka.lo
g.Log)
[2022-12-13 07:12:20,381] INFO [Log partition=fp_sg_flow_copy-1, 
dir=/home/admin/output/kafka-logs] Completed load of log with 1 segments, log 
start offset 165 and log end offset
 165 in 13 ms (kafka.log.Log)
[2022-12-13 07:12:20,389] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NumberFormatException: For input string: "index"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike.toLong(StringLike.scala:306)
at scala.collection.immutable.StringLike.toLong$(StringLike.scala:306)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at kafka.log.Log$.offsetFromFile(Log.scala:1846)
at kafka.log.Log.$anonfun$loadSegmentFiles$3(Log.scala:331)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
at kafka.log.Log.loadSegmentFiles(Log.scala:320)
at kafka.log.Log.loadSegments(Log.scala:403)
at kafka.log.Log.(Log.scala:216)
at kafka.log.Log$.apply(Log.scala:1748)
at kafka.log.LogManager.loadLog(LogManager.scala:265)
at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:335)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-12-13 07:12:20,401] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)


{code}
When I restart the broker, it becomes like this, I deleted the 
000165.index file, after starting it, there are still other 
files with the same error, please tell me how to fix it and what it is causing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-12 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1046722667


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##
@@ -51,8 +52,9 @@ public interface ConfigBackingStore {
  * Update the configuration for a connector.
  * @param connector name of the connector
  * @param properties the connector configuration
+ * @param callback the callback to be invoked after the put is complete; 
can be {@code null} if no callback is desired
  */
-void putConnectorConfig(String connector, Map properties);
+void putConnectorConfig(String connector, Map properties, 
Callback callback);

Review Comment:
   Could potentially add a new overloaded method in `KafkaConfigBackingStore` 
and avoid touching this interface method although that would require 
refactoring `AbstractHerder` to make it generic in order to allow 
`DistributedHerder` to access an instance of `KafkaConfigBackingStore` rather 
than `ConfigBackingStore`. However, that would be a much more noisy change IMO.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName, 
final Map
 }
 
 log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-writeToConfigTopicAsLeader(() -> 
configBackingStore.putConnectorConfig(connName, config));
-
-// Note that we use the updated connector config 
despite the fact that we don't have an updated
-// snapshot yet. The existing task info should 
still be accurate.
-ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName),
-// validateConnectorConfig have checked the 
existence of CONNECTOR_CLASS_CONFIG
-connectorType(config));
-callback.onCompletion(null, new Created<>(!exists, 
info));
+Callback cb = (err, result) -> {

Review Comment:
   Another option could be to directly call `callback.onCompletion` in 
`KafkaConfigBackingStore::putConnectorConfig` itself - i.e. construct the 
`Created` object there using its in-memory maps directly rather 
than the snapshot; this would avoid this double callback kinda mechanism but 
would need some other refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-12 Thread GitBox


yashmayya opened a new pull request, #12984:
URL: https://github.com/apache/kafka/pull/12984

   - Kafka Connect's `POST /connectors` and `PUT 
/connectors/{connector}/config` REST APIs internally simply write a message to 
the Connect cluster's internal config topic (which is then processed 
asynchronously by the herder). 
   - However, no callback is passed to the producer's send method and there is 
no error handling in place for producer send failures (see 
[here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716)
 / 
[here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726)).
   - Consider one such case where the Connect worker's principal doesn't have a 
WRITE ACL on the cluster's config topic. Now suppose the user submits a 
connector's configs via one of the above two APIs. The producer send 
[here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716)
 / 
[here](https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726)
 won't succeed (due to a TopicAuthorizationException) but the API responses 
will be `201 Created` success responses anyway.
   - This is a very poor UX because the connector will actually never be 
created but the API response indicated success. Furthermore, this failure would 
only be detectable if TRACE logs are enabled (via [this 
log)](https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java)
 making it near impossible for users to debug
   - This PR uses producer callbacks to surface write failures back to the user 
via the API response.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma closed pull request #7703: MINOR: Avoid unnecessary tuple allocations in index binary search

2022-12-12 Thread GitBox


ijuma closed pull request #7703: MINOR: Avoid unnecessary tuple allocations in 
index binary search
URL: https://github.com/apache/kafka/pull/7703


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #7703: MINOR: Avoid unnecessary tuple allocations in index binary search

2022-12-12 Thread GitBox


ijuma commented on PR #7703:
URL: https://github.com/apache/kafka/pull/7703#issuecomment-1347702799

   Closing this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #12983: MINOR: ControllerServer should use the new metadata loader and snapshot generator

2022-12-12 Thread GitBox


cmccabe opened a new pull request, #12983:
URL: https://github.com/apache/kafka/pull/12983

   This PR introduces the new metadata loader and snapshot generator. For the 
time being, they are only used by the controller, but a PR for the broker will 
come soon.
   
   The new metadata loader supports adding and removing publishers dynamically. 
(In contrast, the old loader only supported adding a single publisher.) It also 
passes along more information about each new image that is published. This 
information can be found in the LogDeltaManifest and SnapshotManifest classes.
   
   The new snapshot generator replaces the previous logic for generating 
snapshots in QuorumController.java and associated classes. The new generator is 
intended to be shared between the broker and the controller, so it is decoupled 
from both.
   
   There are a few small changes to the old snapshot generator in this PR. 
Specifically, we move the batch processing time and batch size metrics out of 
BrokerMetadataListener.scala and into BrokerServerMetrics.scala.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12975: MINOR; Improve high watermark log messages

2022-12-12 Thread GitBox


jsancio merged PR #12975:
URL: https://github.com/apache/kafka/pull/12975


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12975: MINOR; Improve high watermark log messages

2022-12-12 Thread GitBox


jsancio commented on PR #12975:
URL: https://github.com/apache/kafka/pull/12975#issuecomment-1347573437

   Merging. Unrelated test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Affects Version/s: (was: 3.3.2)

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.4.0, 3.3.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 3.3.2, 3.5.0
>
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Fix Version/s: 3.3.2

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 3.3.2, 3.5.0
>
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`

2022-12-12 Thread GitBox


jsancio commented on code in PR #12981:
URL: https://github.com/apache/kafka/pull/12981#discussion_r1046477051


##
raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java:
##
@@ -32,7 +32,7 @@
  * topic partition from offset 0 up to but not including the end offset in the 
snapshot
  * id.
  *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(OffsetAndEpoch, 
long)

Review Comment:
   Maybe this should point to `RaftClient.createSnapshot` since that is what we 
documented.
   ```
* @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch, long)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14271) Topic recreation fails in KRaft mode when topic contains collidable characters

2022-12-12 Thread Jeffrey Tolar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeffrey Tolar resolved KAFKA-14271.
---
Fix Version/s: 3.4.0
   3.3.2
   Resolution: Duplicate

Haven't tested the patch yet, but KAFKA-14337 appears to be the same as this 
issue; that was fixed with https://github.com/apache/kafka/pull/12790

> Topic recreation fails in KRaft mode when topic contains collidable characters
> --
>
> Key: KAFKA-14271
> URL: https://issues.apache.org/jira/browse/KAFKA-14271
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jeffrey Tolar
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
> Attachments: topic.with.dots.log, topicwithoutdots.log
>
>
> We recently updated one of our clusters from 3.2.0 to 3.3.0 (primarily to get 
> the fix for KAFKA-13909). This cluster is running KRaft mode.
> This is a cluster used for some integration tests - each test deletes the 
> topics it uses before the test to ensure a clean slate for the test; the 
> brokers get restarted in-between tests, but the broker data isn't deleted.
> With 3.3.0, this semi-crashes Kafka. The brokers stay running, but the topic 
> creation fails:
> {noformat}
> [2022-09-30 17:17:59,216] WARN [Controller 1] createTopics: failed with 
> unknown server exception NoSuchElementException at epoch 1 in 601 us.  
> Renouncing leadership and reverting to the last committed offset 18. 
> (org.apache.kafka.controller.QuorumController)
> java.util.NoSuchElementException
> at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167)
> at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139)
> at 
> org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120)
> at 
> org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799)
> at 
> org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567)
> at 
> org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832)
> at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
> at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}
> This appears to be because our topic names contain {{.}}'s. Here's a quick 
> reproducer script:
> {noformat}
> #!/bin/bash
> VERSION=3.3.0
> TOPIC=$1
> set -x
> rm -rf -- kafka_2.13-${VERSION} kafka_2.13-${VERSION}.tgz 
> /tmp/kraft-combined-logs
> trap 'kill -- "-$$" && wait' EXIT
> curl -O https://dlcdn.apache.org/kafka/$VERSION/kafka_2.13-${VERSION}.tgz
> tar -xzf kafka_2.13-${VERSION}.tgz
> cd kafka_2.13-${VERSION}
> id=$(./bin/kafka-storage.sh random-uuid)
> ./bin/kafka-storage.sh format -t $id -c ./config/kraft/server.properties
> ./bin/kafka-server-start.sh config/kraft/server.properties > broker.log 2>&1 &
> sleep 1
> ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> ./bin/kafka-topics.sh --delete --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> sleep 1
> {noformat}
> Running {{./test-kafka.sh topic.with.dots}} exhibits the failure; using 
> {{topicwithoutdots}} works as expected.
> I'll attach the broker logs from each run.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-12 Thread GitBox


jolshan commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1046506543


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")

Review Comment:
   Sure we can do that I suppose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-12 Thread GitBox


jolshan commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1046505868


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -236,7 +236,7 @@ class RPCProducerIdManager(brokerId: Int,
   private[transaction] def handleTimeout(): Unit = {
 warn("Timed out when requesting AllocateProducerIds from the controller.")
 requestInFlight.set(false)
-nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
+
nextProducerIdBlock.put(Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception))

Review Comment:
   I think it so we return the error immediately instead of waiting for the 
poll. 
   We also only send one request -- once we get this response we will drop to 
the block were we poll and get this error response out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`

2022-12-12 Thread GitBox


jsancio commented on code in PR #12981:
URL: https://github.com/apache/kafka/pull/12981#discussion_r1046477051


##
raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java:
##
@@ -32,7 +32,7 @@
  * topic partition from offset 0 up to but not including the end offset in the 
snapshot
  * id.
  *
- * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(long, int, long)
+ * @see org.apache.kafka.raft.KafkaRaftClient#createSnapshot(OffsetAndEpoch, 
long)

Review Comment:
   Maybe this to point to `RaftClient.createSnapshot` since that is what we 
documented.
   ```
* @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch, long)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent opened a new pull request, #12982: MINOR: Fix UT in KAFKA-14372

2022-12-12 Thread GitBox


CalvinConfluent opened a new pull request, #12982:
URL: https://github.com/apache/kafka/pull/12982

   Even if the follower is the ISR, the follower will be excluded because the 
logEndOffset (-1) is less than the fetchOffset (0)
   ```
   if (partition.inSyncReplicaIds.contains(replica.brokerId) &&
   replicaState.logEndOffset >= fetchOffset &&
   replicaState.logStartOffset <= fetchOffset)
   ```
   
https://github.com/CalvinConfluent/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1252
   
   So the test needs to update its logEndOffset by appending one message.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji closed pull request #12963: MINOR: More consistent handling of snapshot IDs

2022-12-12 Thread GitBox


hachikuji closed pull request #12963: MINOR: More consistent handling of 
snapshot IDs
URL: https://github.com/apache/kafka/pull/12963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12963: MINOR: More consistent handling of snapshot IDs

2022-12-12 Thread GitBox


hachikuji commented on PR #12963:
URL: https://github.com/apache/kafka/pull/12963#issuecomment-1347439806

   Closing this patch since the issue with MetadataImage.EMPTY was addressed in 
https://github.com/apache/kafka/commit/b2dea17041157ceee741041d23783ff993b88ef1.
 I've opened a separate patch for the refactor of `RaftClient.createSnapshot`: 
https://github.com/apache/kafka/pull/12981.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12981: MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot`

2022-12-12 Thread GitBox


hachikuji opened a new pull request, #12981:
URL: https://github.com/apache/kafka/pull/12981

   Let `RaftClient.createSnapshot` take the snapshotId directly instead of the 
committed offset/epoch (which may not exist). 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046455192


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -355,6 +375,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
 case id if id < 0 => None
 case id => Some(id)
   }
+  val kraftControllerIdOpt = updateMetadataRequest.kraftControllerId() 
match {
+case id if id < 0 => None
+case id => Some(id)
+  }
+  if (controllerIdOpt.nonEmpty && kraftControllerIdOpt.nonEmpty) {

Review Comment:
   This isn't an error, right? It's possible for both of them to be -1... Like 
sometimes we are in the middle of electing a new controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046452882


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -60,7 +61,8 @@ trait ZkFinalizedFeatureCache {
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, 
brokerFeatures: BrokerFeatures)
+class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion,

Review Comment:
   can we do an
   ```
   abc(
   a
   b
   c
   ) {
   }
   ```
   thing here? Sorry, these long scala declarations are ugly otherwise...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046452203


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -544,6 +548,7 @@ class KafkaServer(
   private def controlledShutdown(): Unit = {
 val socketTimeoutMs = config.controllerSocketTimeoutMs
 
+// TODO (KAFKA-14447): Handle controlled shutdown for zkBroker when we 
have KRaft controller.

Review Comment:
   Can we just log a message and `return` from this function early, when we are 
in this situation? Currently, it looks like we just get stuck and time out, 
which is not great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046450807


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -262,8 +263,11 @@ class KafkaServer(
 _brokerState = BrokerState.RECOVERY
 logManager.startup(zkClient.getAllTopicsInCluster())
 
-metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion, brokerFeatures)
-val controllerNodeProvider = 
MetadataCacheControllerNodeProvider(config, metadataCache)
+if (config.migrationEnabled) {
+  kraftControllerNodes = 
RaftConfig.voterConnectionsToNodes(RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
+}
+metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion, brokerFeatures, kraftControllerNodes)

Review Comment:
   can we put each parameter on its own line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046450254


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
 
   val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
+  def enableZkApiForwarding: Boolean = migrationEnabled && 
interBrokerProtocolVersion.isApiForwardingEnabled

Review Comment:
   to clarify, it's fine to do this static check for now but it should be done 
in `Kafka.scala` and we should comment that it's OK because IBP is static in ZK 
mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046449684


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
 
   val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
+  def enableZkApiForwarding: Boolean = migrationEnabled && 
interBrokerProtocolVersion.isApiForwardingEnabled

Review Comment:
   please don't add a reference to `KafkaConfig.interBrokerProtocolVersion` 
here. That is going away since we are moving away from statically configuring 
IBP (or indeed calling it IBP)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447468


##
clients/src/main/resources/common/message/LeaderAndIsrRequest.json:
##
@@ -36,7 +36,7 @@
   "fields": [
 { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
   "about": "The current controller ID." },
-{ "name": "KRaftControllerId", "type": "int32", "versions": "7+", 
"entityType": "brokerId", "default": "-1",
+{ "name": "KRaftControllerId", "type": "int32", "versions": "7+", 
"default": "-1", "entityType": "brokerId", "default": "-1",

Review Comment:
   this already has a default value of -1. It doesn't need to be stated twice...



##
clients/src/main/resources/common/message/StopReplicaRequest.json:
##
@@ -31,7 +31,7 @@
   "fields": [
 { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
   "about": "The controller id." },
-{ "name": "KRaftControllerId", "type": "int32", "versions": "4+", 
"entityType": "brokerId", "default": "-1",
+{ "name": "KRaftControllerId", "type": "int32", "versions": "4+", 
"default": "-1", "entityType": "brokerId", "default": "-1",

Review Comment:
   this already has a default value of -1. It doesn't need to be stated twice...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447181


##
clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java:
##
@@ -51,7 +51,16 @@ public static class Builder extends 
AbstractControlRequest.Builder partitionStates, 
List liveBrokers,
Map topicIds) {
-super(ApiKeys.UPDATE_METADATA, version, controllerId, 
controllerEpoch, brokerEpoch);
+super(ApiKeys.UPDATE_METADATA, version, controllerId, 
controllerEpoch, brokerEpoch, false);

Review Comment:
   same, define this in terms of the other ctor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046447035


##
clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java:
##
@@ -45,14 +45,23 @@ public static class Builder extends 
AbstractControlRequest.Builder 
topicStates) {
-super(ApiKeys.STOP_REPLICA, version, controllerId, 
controllerEpoch, brokerEpoch);
+super(ApiKeys.STOP_REPLICA, version, controllerId, 
controllerEpoch, brokerEpoch, false);

Review Comment:
   this is another case where we should define this in terms of the other 
constructor to avoid duplication
   ```
   this(version, controllerId, controllerEpoch, brokerEpoch, deletePartitions, 
topicStates, false)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046446539


##
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java:
##
@@ -51,7 +51,16 @@ public static class Builder extends 
AbstractControlRequest.Builder partitionStates, 
Map topicIds,
Collection liveLeaders) {
-super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch);
+super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch, false);

Review Comment:
   let's define this in terms of the other constructor to avoid duplicating the 
below code. something like:
   ```
   this(version, controllerId, controllerEpoch, brokerEpoch, false)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

2022-12-12 Thread GitBox


mumrah commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046370590


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1317,14 +1316,23 @@ class KafkaApis(val requestChannel: RequestChannel,
 
 trace("Sending topic metadata %s and brokers %s for correlation id %d to 
client %s".format(completeTopicMetadata.mkString(","),
   brokers.mkString(","), request.header.correlationId, 
request.header.clientId))
+val controllerId = {
+  metadataCache match {
+case cache: KRaftMetadataCache => cache.getControllerId.map(_.id)
+case cache => cache.getControllerId.flatMap {

Review Comment:
   Can we add the class for this case as well?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3332,13 +3340,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
 
 val brokers = 
metadataCache.getAliveBrokerNodes(request.context.listenerName)
-val controllerId = 
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+val controllerId = {
+  metadataCache match {
+case cache: KRaftMetadataCache => cache.getControllerId.map(_.id)
+case cache => cache.getControllerId.flatMap {

Review Comment:
   And here



##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -315,7 +324,18 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
 }.getOrElse(Map.empty[Int, Node])
   }
 
-  def getControllerId: Option[Int] = metadataSnapshot.controllerId
+  def getControllerId: Option[CachedControllerId] = {
+val snapshot = metadataSnapshot
+snapshot.kraftControllerId match {
+  case Some(id) => Some(KRaftCachedControllerId(id))
+  case None => snapshot.controllerId.map(ZkCachedControllerId)
+}
+  }
+
+  def   getRandomAliveBrokerId: Option[Int] = {

Review Comment:
   errant whitespace



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-14392.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 3.5.0
>
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Affects Version/s: 3.3.2

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Affects Version/s: 3.4.0

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.4.0, 3.3.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Component/s: kraft

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Affects Version/s: (was: 3.4.0)

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14392) KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-12-12 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-14392:
--
Affects Version/s: 3.3.1
   3.3.0
   3.4.0

> KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
> --
>
> Key: KAFKA-14392
> URL: https://issues.apache.org/jira/browse/KAFKA-14392
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.0, 3.4.0, 3.3.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
>
> KRaft brokers maintain their liveness in the cluster by sending 
> BROKER_HEARTBEAT requests to the active controller; the active controller 
> fences a broker if it doesn't receive a heartbeat request from that broker 
> within the period defined by `broker.session.timeout.ms`.  The broker should 
> use a request timeout for its BROKER_HEARTBEAT requests that is not larger 
> than the session timeout being used by the controller; doing so creates the 
> possibility that upon controller failover the broker might not cancel an 
> existing heartbeat request in time and then subsequently heartbeat to the new 
> controller to maintain an uninterrupted session in the cluster.  In other 
> words, a failure of the active controller could result in under-replicated 
> (or under-min ISR) partitions simply due to a delay in brokers heartbeating 
> to the new controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-12 Thread GitBox


hachikuji commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1046275111


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")

Review Comment:
   I was just trying to figure out if we wanted the "unexpected error" message 
to be logged or not. Usually we try to catch errors in the handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12968: KAFKA-14417: Addressing the error server side

2022-12-12 Thread GitBox


hachikuji commented on code in PR #12968:
URL: https://github.com/apache/kafka/pull/12968#discussion_r1046262250


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")
+  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")

Review Comment:
   It would be helpful to have a comment here since it's probably not obvious 
why we use this error instead of `REQUEST_TIMED_OUT`.  



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -236,7 +236,7 @@ class RPCProducerIdManager(brokerId: Int,
   private[transaction] def handleTimeout(): Unit = {
 warn("Timed out when requesting AllocateProducerIds from the controller.")
 requestInFlight.set(false)
-nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
+
nextProducerIdBlock.put(Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception))

Review Comment:
   I wonder why we do this. If we don't send anything back, we still get the 
timeout after `maxWaitMs`. The odd thing is that the timeout might occur in the 
middle of the wait for `maxWaitMs`, so the actual time we wait might be less 
than that. I wonder if we can just get rid of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12975: MINOR; Improve high watermark log messages

2022-12-12 Thread GitBox


jsancio commented on code in PR #12975:
URL: https://github.com/apache/kafka/pull/12975#discussion_r1046207835


##
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##
@@ -169,4 +184,26 @@ public void close() {
 fetchingSnapshot.get().close();
 }
 }
+
+private void logHighWatermarkUpdate(
+Optional oldHighWatermark,
+Optional newHighWatermark
+) {
+if (!oldHighWatermark.equals(newHighWatermark)) {
+if (oldHighWatermark.isPresent()) {
+log.trace(
+"High watermark set to {} from {} for epoch {}",
+newHighWatermark,
+oldHighWatermark.get(),
+epoch
+);
+} else {
+log.info(
+"High watermark set to {} for the firs time for epoch {}",

Review Comment:
   Fixed. Thanks for the review @showuon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.

2022-12-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14461:

Component/s: streams
 unit tests

> StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to 
> check for active partitions seems brittle.
> --
>
> Key: KAFKA-14461
> URL: https://issues.apache.org/jira/browse/KAFKA-14461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> {noformat}
> StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores {noformat}
> has a logic to figure out active partitions:
>  
>  
> {code:java}
> final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 
> 2) == 1;{code}
>  
>  
> This is very brittle as when a new test gets added, this check would need to 
> be changed to `==0`. It's a hassle to change it everytime with a new test 
> added. Should look to improve this.
> Also, this test relies on junit4 annotations which can be migrated to Junit 5 
> so that we can use @BeforeAll to set up and @AfterAll to shutdown the cluster 
> instead of the current way where it's being done before/after every test.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe merged pull request #12964: MINOR: Introduce MetadataProvenance and ImageReWriter

2022-12-12 Thread GitBox


cmccabe merged PR #12964:
URL: https://github.com/apache/kafka/pull/12964


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #12980: KAFKA-14464: Make resource leaks for MM2 resources more difficult

2022-12-12 Thread GitBox


gharris1727 opened a new pull request, #12980:
URL: https://github.com/apache/kafka/pull/12980

   For all AutoCloseable resources created by MM2 connectors & tasks, force 
their instantiation to take place within a BackgroundResources instance, which 
can be closed when the connector/task stops.
   
   Signed-off-by: Greg Harris 
   
   This was split out of #12955 for scope reasons.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14323) KRaft broker time based snapshots

2022-12-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14323:
---
Fix Version/s: (was: 3.4.0)

> KRaft broker time based snapshots
> -
>
> Key: KAFKA-14323
> URL: https://issues.apache.org/jira/browse/KAFKA-14323
> Project: Kafka
>  Issue Type: New Feature
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14323) KRaft broker time based snapshots

2022-12-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14323:
---
Fix Version/s: 3.5.0

> KRaft broker time based snapshots
> -
>
> Key: KAFKA-14323
> URL: https://issues.apache.org/jira/browse/KAFKA-14323
> Project: Kafka
>  Issue Type: New Feature
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14403) Snapshot failure metrics

2022-12-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14403:
---
Fix Version/s: 3.5.0

> Snapshot failure metrics
> 
>
> Key: KAFKA-14403
> URL: https://issues.apache.org/jira/browse/KAFKA-14403
> Project: Kafka
>  Issue Type: New Feature
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.5.0
>
>
> Implement the following metrics:
> Controller:
> kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors
> Incremented anytime the controller fails to generate a snapshot. Reset to 
> zero anytime the controller restarts or a snapshot is successfully generated.
> Broker:
> kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors
> Incremented anytime the broker fails to generate a snapshot. Reset to zero 
> anytime the broker restarts or a snapshot is successfully generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14464) Refactor MM2 connectors to make resource leaks from code defects less likely

2022-12-12 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14464:
---

 Summary: Refactor MM2 connectors to make resource leaks from code 
defects less likely
 Key: KAFKA-14464
 URL: https://issues.apache.org/jira/browse/KAFKA-14464
 Project: Kafka
  Issue Type: Task
  Components: mirrormaker
Reporter: Greg Harris
Assignee: Greg Harris


The MirrorMaker2 connectors instantiate many closeable resources during start() 
which must be closed in the corresponding stop() method. If a new resource is 
created, a code defect may be introduced by not updating the stop() method to 
close the resource, and leaking that resource after the lifetime of the 
connector.

We should refactor these connectors in some fashion which makes these defects 
less likely, and explicitly manage the lifetime of these background resources 
rather than relying on developer & reviewer due diligence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2022-12-12 Thread GitBox


cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
-
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
-@Before
-public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-stateManager = ctrl.createMock(ProcessorStateManager.class);
-stateDirectory = ctrl.createMock(StateDirectory.class);
-}
-
 @Test
 public void testRegisterStateStoreWhenTopologyEmpty() {
-expect(topology.stateStores()).andReturn(emptyList());
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(topology.stateStores()).thenReturn(emptyList());
 
 StateManagerUtil.registerStateStores(logger,
 "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-ctrl.verify();
 }
 
 @Test
 public void testRegisterStateStoreFailToLockStateDirectory() {
-expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
-
-expect(stateManager.taskId()).andReturn(taskId);
+when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
 

Review Comment:
   ```suggestion
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
-
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
-@Before
-public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-stateManager = ctrl.createMock(ProcessorStateManager.class);
-stateDirectory = ctrl.createMock(StateDirectory.class);
-}
-
 @Test
 public void testRegisterStateStoreWhenT

[jira] [Created] (KAFKA-14463) ConnectorClientConfigOverridePolicy is not closed at worker shutdown

2022-12-12 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14463:
---

 Summary: ConnectorClientConfigOverridePolicy is not closed at 
worker shutdown
 Key: KAFKA-14463
 URL: https://issues.apache.org/jira/browse/KAFKA-14463
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Greg Harris


The ConnectorClientConfigOverridePolicy is marked AutoCloseable, but is never 
closed by the worker on shutdown.

This is currently not a critical issue, as all known implementations of the 
policy have a no-op close. But a possible implementation which does instantiate 
background resources that must be closed in close() would leak those resources 
in a test environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #12973: MINOR: Change KRaft controllership claiming algorithm

2022-12-12 Thread GitBox


mumrah commented on code in PR #12973:
URL: https://github.com/apache/kafka/pull/12973#discussion_r1046093236


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -166,60 +166,74 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* the migration.
*
* To ensure that the KRaft controller epoch exceeds the current ZK 
controller epoch, this registration algorithm
-   * uses a conditional update on the /controller_epoch znode. If a new ZK 
controller is elected during this method,
-   * the conditional update on /controller_epoch fails which causes the whole 
multi-op transaction to fail.
+   * uses a conditional update on the /controller and /controller_epoch znodes.
+   *
+   * If a new controller is registered concurrently with this registration, 
one of the two will fail the CAS
+   * operation on /controller_epoch. For KRaft, we have an extra guard against 
the registered KRaft epoch going
+   * backwards. If a KRaft controller had previously registered, an additional 
CAS operation is done on the /controller
+   * ZNode to ensure that the KRaft epoch being registered is newer.
*
* @param kraftControllerId ID of the KRaft controller node
* @param kraftControllerEpoch Epoch of the KRaft controller node
-   * @return An optional of the new zkVersion of /controller_epoch. None if we 
could not register the KRaft controller.
+   * @return An optional of the written epoch and new zkVersion of 
/controller_epoch. None if we could not register the KRaft controller.
*/
-  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[Int] = {
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[(Int, Int)] = {
 val timestamp = time.milliseconds()
 val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, 
e._2.getVersion))
-val controllerOpt = getControllerId
-val controllerEpochToStore = kraftControllerEpoch + 1000 // TODO 
Remove this after KAFKA-14436
+val controllerOpt = getControllerRegistration
+
+// If we have a KRaft epoch registered in /controller, and it is not 
_older_ than the requested epoch, throw an error.
+controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
+  if (kraftEpochInZk >= kraftControllerEpoch) {
+throw new ControllerMovedException(s"Cannot register KRaft controller 
$kraftControllerId with epoch $kraftControllerEpoch " +
+  s"as the current controller register in ZK has the same or newer 
epoch $kraftEpochInZk.")
+  }
+}
+
 curEpochOpt match {
   case None =>
 throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
   s"since there is no ZK controller epoch present.")
   case Some((curEpoch: Int, curEpochZk: Int)) =>
-if (curEpoch >= controllerEpochToStore) {
-  // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
-  throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
-s"in ZK since its epoch ${controllerEpochToStore} is not higher 
than the current ZK epoch ${curEpoch}.")
+// TODO KAFKA-14436 Increase the KRaft epoch to be higher than the ZK 
epoch
+val newControllerEpoch = if (kraftControllerEpoch >= curEpoch) {
+  kraftControllerEpoch
+} else {
+  curEpoch + 1
 }
 
-val response = if (controllerOpt.isDefined) {
-  info(s"KRaft controller $kraftControllerId overwriting 
${ControllerZNode.path} to become the active " +
-s"controller with epoch $controllerEpochToStore. The previous 
controller was ${controllerOpt.get}.")
-  retryRequestUntilConnected(
-MultiRequest(Seq(
-  SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-  DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
-  CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-  )
-} else {
-  info(s"KRaft controller $kraftControllerId creating 
${ControllerZNode.path} to become the active " +
-s"controller with epoch $controllerEpochToStore. There was no 
active controller.")
-  retryRequestUntilConnected(
-MultiRequest(Seq(
-  SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-  CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-  )
+val response = controllerOpt match {
+  case Some(controller) =

[jira] [Created] (KAFKA-14462) New Group Coordinator State Machine

2022-12-12 Thread David Jacot (Jira)
David Jacot created KAFKA-14462:
---

 Summary: New Group Coordinator State Machine
 Key: KAFKA-14462
 URL: https://issues.apache.org/jira/browse/KAFKA-14462
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac closed pull request #12969: MINOR: Small refactor in ApiVersionManager

2022-12-12 Thread GitBox


dajac closed pull request #12969: MINOR: Small refactor in ApiVersionManager
URL: https://github.com/apache/kafka/pull/12969


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2022-12-12 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646033#comment-17646033
 ] 

Mikael commented on KAFKA-14419:


Thanks, I'll try that out. I did see the same symptom but with a different 
sequence of log events the other day (and this was with my workaround in place 
so that was apparently not enough):
{noformat}


2022-12-11 14:58:16,429 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Request joining group due to: group is already 
rebalancing
2022-12-11 14:58:16,429 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] (Re-)joining group
2022-12-11 14:58:16,760 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Successfully joined group with generation 
Generation{generationId=12752, 
memberId='messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer-2bacd65f-cfb5-4ed9-9705-1db3e7dbfc6c',
 protocol='stream'}
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Successfully synced group in generation 
Generation{generationId=12752, 
memberId='messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer-2bacd65f-cfb5-4ed9-9705-1db3e7dbfc6c',
 protocol='stream'}
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Updating assignment with
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Notifying assignor about the new 
Assignment(partitions=[messages.xms.mt.input-4], userDataSize=52)
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] 
stream-thread 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer] 
No followup rebalance was requested, resetting the rebalance schedule.
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] Handle 
new assignment with:
2022-12-11 14:58:16,778 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer 
clientId=messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4-consumer,
 groupId=messages.xms.mt] Adding newly assigned partitions: 
2022-12-11 14:58:16,779 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] State 
transition from RUNNING to PARTITIONS_ASSIGNED
2022-12-11 14:58:16,852 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
Restoration took 72 ms for all tasks [1_4]
2022-12-11 14:58:16,852 INFO 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] 
o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread 
[messages.xms.mt-6fb347a8-a830-4cbe-86ab-d3855fa5ca0e-StreamThread-4] State 
transition from PARTITIONS_ASSIGNED to RUNNING


{noformat}

> Same message consumed again by the same stream task after partition is lost 
> and reassigned
> --
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: s

[GitHub] [kafka] chia7712 opened a new pull request, #12979: MINOR: remove "is-future" from metrics tags after replace current log…

2022-12-12 Thread GitBox


chia7712 opened a new pull request, #12979:
URL: https://github.com/apache/kafka/pull/12979

   we don't remove "is-future=true" tag from future log after the future log 
becomes "current" log. It causes two potential issues:
   
   1. the metrics monitors can't get metrics of Log if they don't trace the 
property "is-future=true".
   2. all Log metrics of specify partition get removed if the partition is 
moved to another folder again.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14379) consumer should refresh preferred read replica on update metadata

2022-12-12 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14379.
-
Fix Version/s: 3.3.2
   Resolution: Fixed

> consumer should refresh preferred read replica on update metadata
> -
>
> Key: KAFKA-14379
> URL: https://issues.apache.org/jira/browse/KAFKA-14379
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Blocker
> Fix For: 3.4.0, 3.3.2
>
>
> The consumer (fetcher) refreshes the preferred read replica only on three 
> conditions:
>  # the consumer receives an OFFSET_OUT_OF_RANGE error
>  # the follower does not exist in the client's metadata (i.e., offline)
>  # after metadata.max.age.ms (5 min default)
> For other errors, it will continue to reach to the possibly unavailable 
> follower and only after 5 minutes will it refresh the preferred read replica 
> and go back to the leader.
> A specific example is when a partition is reassigned. the consumer will get 
> NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred 
> read replica will not be refreshed as the follower is still online. it will 
> continue to reach out to the old follower until the preferred read replica 
> expires.
> the consumer can instead refresh its preferred read replica whenever it makes 
> a metadata update request. so when the consumer receives i.e. 
> NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without 
> waiting for the expiration.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-12 Thread GitBox


dajac commented on PR #12956:
URL: https://github.com/apache/kafka/pull/12956#issuecomment-1346125696

   Merged to trunk, 3.4 and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-12 Thread GitBox


dajac merged PR #12956:
URL: https://github.com/apache/kafka/pull/12956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-12 Thread GitBox


dajac commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1045547158


##
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
 public Optional nodeIfOnline(TopicPartition partition, int id) {
 Node node = nodeById(id);
 PartitionInfo partitionInfo = partition(partition);
-if (node != null && partitionInfo != null && 
!Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+if (node != null && partitionInfo != null &&
+!Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&

Review Comment:
   Thanks for the clarification. It is actually not too bad like this. We can 
merge like this and follow-up if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12978: MINOR: Small refactor in KafkaApis.handleHeartbeatRequest

2022-12-12 Thread GitBox


dajac opened a new pull request, #12978:
URL: https://github.com/apache/kafka/pull/12978

   This is a small follow-up to https://github.com/apache/kafka/pull/12848. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

2022-12-12 Thread GitBox


yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1045537542


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * 
- * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a 
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * 
- * 
- * By default, no job configs or offset data is persistent. You can make jobs 
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a 
single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * 
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli {
 private static final Logger log = 
LoggerFactory.getLogger(ConnectStandalone.class);
 
+@Override
+protected Herder createHerder(StandaloneConfig config, String workerId, 
Plugins plugins,
+  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+  RestServer restServer, RestClient 
restClient) {
+
+OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+offsetBackingStore.configure(config);
+
+Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore,
+connectorClientConfigOverridePolicy);
+
+return new StandaloneHerder(worker, config.kafkaClusterId(), 
connectorClientConfigOverridePolicy);
+}
+
+@Override
+protected StandaloneConfig createConfig(Map workerProps) {
+return new StandaloneConfig(workerProps);
+}
+
 public static void main(String[] args) {

Review Comment:
   Nice! This looks much cleaner and I've made the changes (largely the same 
with minor tweaks). The only small qualm that I have is that the integration 
test framework also uses `startConnect` and it's a bit odd as it's initializing 
a CLI class with no args (since it passes the worker properties map directly to 
`startConnect`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12858: KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface

2022-12-12 Thread GitBox


dajac commented on code in PR #12858:
URL: https://github.com/apache/kafka/pull/12858#discussion_r1045524817


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -1992,6 +1992,186 @@ class KafkaApisTest {
 testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testHandleDeleteGroups(): Unit = {
+val deleteGroupsRequest = new DeleteGroupsRequestData()
+  .setGroupsNames(List(
+"group-1",
+"group-2",
+"group-3"
+  ).asJava)
+
+val requestChannelRequest = buildRequest(new 
DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  ApiKeys.DELETE_GROUPS.latestVersion,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
+)
+
+val future = new 
CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
+when(newGroupCoordinator.deleteGroups(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(List("group-1", "group-2", "group-3").asJava)
+)).thenReturn(future)
+
+createKafkaApis().handleDeleteGroupsRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
+)
+
+val results = new 
DeleteGroupsResponseData.DeletableGroupResultCollection(List(

Review Comment:
   Thanks for your comment. I played a bit with your suggestion and I found 
that using a `Map` is not convenient here because the order matters. We could 
use an `LinkedHashMap` but that makes things less readable in my opinion. I 
think that I will stick to the current approach which is not that bad. We do 
this everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-12-12 Thread GitBox


dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1044655133


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -1933,11 +1943,79 @@ private Map 
topicPartitionTags(TopicPartition tp) {
 }
 }
 
+// Visible for testing
+void maybeCloseFetchSessions(final Timer timer) {
+final Cluster cluster = metadata.fetch();
+final List> requestFutures = new 
ArrayList<>();
+for (final Map.Entry entry : 
sessionHandlers.entrySet()) {
+final FetchSessionHandler sessionHandler = entry.getValue();
+// set the session handler to notify close. This will set the next 
metadata request to send close message.
+sessionHandler.notifyClose();
+
+final int sessionId = sessionHandler.sessionId();
+final Integer fetchTargetNodeId = entry.getKey();
+// FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+// skip sending the close request.
+final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+continue;
+}
+
+final RequestFuture responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+responseFuture.addListener(new 
RequestFutureListener() {
+@Override
+public void onSuccess(ClientResponse value) {
+log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+}
+
+@Override
+public void onFailure(RuntimeException e) {
+log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+"This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+}
+});
+
+requestFutures.add(responseFuture);
+}
+
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+do {
+client.poll(timer, null, true);
+} while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+// we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+// here.
+log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+"This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
+"KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+}
+}
+
+public void close(final Timer timer) {
+if (!isClosed.compareAndSet(false, true)) {
+log.info("Fetcher {} is already closed.", this);
+return;
+}
+
+// Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
+// it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
+synchronized (Fetcher.this) {
+// we do not need to re-enable wakeups since we are closing already
+client.disableWakeups();

Review Comment:
   Could you elaborate on why we need this here? It is something that we did 
not have before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org