[jira] [Commented] (KAFKA-13607) Cannot use PEM certificate coding when parent defined file-based

2024-01-18 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808526#comment-17808526
 ] 

Sergey Ivanov commented on KAFKA-13607:
---

Hello,

We also faced an issue with DefaultSslEngineFactory.java

In our case we use Kafka Connect with config provider, which has the following 
properties:
{code:java}
  "producer.override.ssl.truststore.type" : 
"${saas:topicconfig:ssl.truststore.type}",
  "producer.override.bootstrap.servers" : 
"${saas:topicconfig:bootstrap.servers}",
  "producer.override.ssl.truststore.certificates" : 
"${saas:topicconfig:ssl.truststore.certificates}",
  "producer.override.security.protocol" : 
"${saas:topicconfig:security.protocol}",
  "producer.override.ssl.keystore.key" : "${saas:topicconfig:ssl.keystore.key}",
  "producer.override.ssl.keystore.type" : 
"${saas:topicconfig:ssl.keystore.type}",
  "producer.override.sasl.jaas.config" : "${saas:topicconfig:sasl.jaas.config}",
  "producer.override.sasl.mechanism" : "${saas:topicconfig:sasl.mechanism}",
  "producer.override.ssl.keystore.certificate.chain" : 
"${saas:topicconfig:ssl.keystore.certificate.chain}"{code}
And base on real Kafka connection properties the Conmfig Provider includes 
coresponding values. For example, for Kafka with TLS but without mTLS (only 
server cert), it provides empty values for ssl.keystore.key and 
ssl.keystore.type (as you know Config Provider can't remove property from 
connector at all). But in the code here:

[https://github.com/apache/kafka/blob/3.5/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L274]

it doesn't check for empty key, only for null. So we got an error:
{code:java}
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: SSL 
private key can be specified only for PEM, but key store type is . {code}
Looks like the provided PR is helping us also: 
[https://github.com/apache/kafka/pull/11707]

> Cannot use PEM certificate coding when parent defined file-based
> 
>
> Key: KAFKA-13607
> URL: https://issues.apache.org/jira/browse/KAFKA-13607
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, connect
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Piotr Smolinski
>Priority: Major
>
> The problem applies to the situation when we create a Kafka client based on 
> prepopulated config. If we have only partial control on the input we can 
> attempt to reset some values.
> KIP-651 added a new cool feature to use PEM coding of certificates as an 
> alternative to file stores. I have observed a problem in Confluent 
> Replicator. We have shifted the common configuration to the worker level and 
> assumed the connectors define only what is specific for them. The security 
> setup is mTLS, i.e. we need both client cert and trusted chain. Our default 
> configuration has both in #PKCS12 files, but we had to reverse the 
> replication direction and redefine the destination coordinates. For these we 
> have certificates, but having KIP-651 we could specify them as connector 
> params as opposed to the worker deployment change.
> It came out that we cannot override {*}ssl.keystore.location{*}, 
> {*}ssl.keystore.password{*}, etc. simply with empty values, because the code 
> in the *DefaultSslEngineFactory* checks if the entry is null. We can only 
> override it to empty string.
> *DefaultSslEngineFactory* should treat the unexpected configuration entries 
> as absent when they are {*}null{*}, but also when the given entry is an empty 
> string.
> For a workaround I have created a hacky patch that fixes the behaviour:
> [https://github.com/piotrsmolinski/kafka-ssl-fix]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on PR #15183:
URL: https://github.com/apache/kafka/pull/15183#issuecomment-1899919345

   For the reference, I was thinking about an alternative approach. Instead of 
keeping track of the offset of the record in the log for every committed 
offsets, we could delete all the pending transactional offsets when a new 
offset record is replayed. The downside of this approach is that we would need 
to restructure how pending transactional offsets are stored in order to have an 
efficient way to find all the pending offsets for a given group, topic and 
partition tuple. Otherwise, the replay would become less efficient than it is 
today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1458508459


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -712,13 +712,14 @@ public void run() {
 try {
 // Apply the records to the state machine.
 if (result.replayRecords()) {
-result.records().forEach(record ->
+for (int i = 0; i < result.records().size(); 
i++) {
 context.coordinator.replay(
+prevLastWrittenOffset + i,

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1458504559


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -918,7 +920,7 @@ public void replay(
 groupId,
 topic,
 partition,
-OffsetAndMetadata.fromRecord(value)
+OffsetAndMetadata.fromRecord(offset, value)

Review Comment:
   I agree that it is a bit confusing. Let me rename `offset` to `recordOffset` 
to make it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


CalvinConfluent commented on PR #14612:
URL: https://github.com/apache/kafka/pull/14612#issuecomment-1899907260

   Verified the following tests locally
   testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress also fails 
in other PR 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2588/tests/
   testDescribeQuorumReplicationSuccessful
   This PR mostly new code and uses its code path, so theoretically will not 
affect other UT. Running the integration again by merging the latest master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1458501802


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -92,30 +117,34 @@ public boolean equals(Object o) {
 
 OffsetAndMetadata that = (OffsetAndMetadata) o;
 
-if (offset != that.offset) return false;
+if (committedOffset != that.committedOffset) return false;
 if (commitTimestampMs != that.commitTimestampMs) return false;
-if (!leaderEpoch.equals(that.leaderEpoch)) return false;
-if (!metadata.equals(that.metadata)) return false;
-return expireTimestampMs.equals(that.expireTimestampMs);
+if (recordOffset != that.recordOffset) return false;
+if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;

Review Comment:
   I don't think so. There is a feature to auto-generate equals and hashCode. 
It has been around for a while now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-18 Thread via GitHub


showuon commented on code in PR #15193:
URL: https://github.com/apache/kafka/pull/15193#discussion_r1458455522


##
docs/ops.html:
##
@@ -3992,6 +3979,75 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,
+select the final migration step that you have completed in 
this table.
+  
+  
+Note that the directions given here assume that each step was fully 
completed, and they were
+done in order. So, for example, we assume that if "Enabling the migration 
on the brokers" was completed,
+"Provisioning the KRaft controller quorum" was also fully completed 
previously.
+  
+  
+If you did not fully complete any step, back out whatever you have done 
and then follow revert
+directions for the last fully completed step.
+  
+
+  
+  
+  
+Final Migration Section Completed
+Directions for Reverting
+Notes
+  
+  
+Preparing for migration
+The prepartion section does not involve leaving ZK mode. So there 
is nothing to do in
+the case of a revert.
+
+  
+  
+Provisioning the KRaft controller quorum
+Deprovision the KRaft controller quorum, and then you are 
done.
+
+  
+  
+Enabling zookeeper.metadata.migration.enable on the brokers
+Roll the broker cluster with 
zookeeper.metadata.migration.enable=false. Then,
+deprovision the KRaft controller quorum. Then you are done.

Review Comment:
   In the `Enabling zookeeper.metadata.migration.enable on the brokers` step, 
the controller znode will be replaced with KRaft controller, we should remove 
the controller znode to allow zk broker to take over.



##
docs/ops.html:
##
@@ -3992,6 +3979,75 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,
+select the final migration step that you have completed in 
this table.
+  
+  
+Note that the directions given here assume that each step was fully 
completed, and they were
+done in order. So, for example, we assume that if "Enabling the migration 
on the brokers" was completed,
+"Provisioning the KRaft controller quorum" was also fully completed 
previously.
+  
+  
+If you did not fully complete any step, back out whatever you have done 
and then follow revert
+directions for the last fully completed step.
+  
+
+  
+  
+  
+Final Migration Section Completed
+Directions for Reverting
+Notes
+  
+  
+Preparing for migration
+The prepartion section does not involve leaving ZK mode. So there 
is nothing to do in
+the case of a revert.
+
+  
+  
+Provisioning the KRaft controller quorum
+Deprovision the KRaft controller quorum, and then you are 
done.
+
+  
+  
+Enabling zookeeper.metadata.migration.enable on the brokers
+Roll the broker cluster with 
zookeeper.metadata.migration.enable=false. Then,
+deprovision the KRaft controller quorum. Then you are done.
+
+  
+  
+Migrating brokers to KRaft
+
+Roll the broker cluster with the process.roles configuration 
omitted, node.id
+replaced with broker.id, and the zookeeper.connect configuration 
set to a valid
+value.
+  
+   
+After this roll is fully complete, perform a second roll where you 
set
+zookeeper.metadata.migration.enable=false. Then,
+deprovision the KRaft controller quorum. Then you are done.

Review Comment:
   removing controller znode?



##
docs/ops.html:
##
@@ -3992,6 +3979,75 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,
+select the final migration step that you have completed in 
this table.
+  
+  
+Note that the directions given here assume that each step was fully 
completed, and they were
+done in order. So, for example, we assume that if "Enabling the migration 
on the brokers" was completed,
+"Provisioning the KRaft controller quorum" was also fully completed 
previously.
+  
+  
+If you did not fully complete any step, back out whatever you have done 
and then 

Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-18 Thread via GitHub


philipnee commented on PR #15210:
URL: https://github.com/apache/kafka/pull/15210#issuecomment-1899847833

   Hey @lucasbru - Thanks for your time reviewing this PR. I've made changes 
according to your suggestion.  Let me know what do you think of the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-18 Thread via GitHub


appchemist commented on code in PR #15225:
URL: https://github.com/apache/kafka/pull/15225#discussion_r1458326771


##
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala:
##
@@ -277,6 +283,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
 producer.close()
   }
 
+  private def createTopicWithAssignment(topic: String, 
partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {

Review Comment:
   @ex172000 
   In `TestUtils`, How about changing the method names from 
`createTopicWithAssignment` to `createTopic`, and from `createTopic` to 
`createTopicWithZkClient`, similar to `createTopicWithAdmin`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-18 Thread via GitHub


appchemist commented on code in PR #15225:
URL: https://github.com/apache/kafka/pull/15225#discussion_r1458315848


##
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala:
##
@@ -277,6 +283,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
 producer.close()
   }
 
+  private def createTopicWithAssignment(topic: String, 
partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {

Review Comment:
   @ex172000 Of course, I did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]

2024-01-18 Thread via GitHub


showuon commented on PR #15235:
URL: https://github.com/apache/kafka/pull/15235#issuecomment-1899645426

   @mimaison , please help review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]

2024-01-18 Thread via GitHub


showuon commented on code in PR #15235:
URL: https://github.com/apache/kafka/pull/15235#discussion_r1458256739


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -101,6 +101,10 @@ protected LeaderState(
  * @return the remainingMs before the checkQuorumTimer expired
  */
 public long timeUntilCheckQuorumExpires(long currentTimeMs) {
+// if there's only 1 voter, it should never get expired.
+if (voterStates.size() == 1) {
+return Long.MAX_VALUE;
+}
 checkQuorumTimer.update(currentTimeMs);

Review Comment:
   I was thinking we don't need `checkQuorumTimer` for 1 voter's case, but 
since in next release, we're going to allow dynamically scaling up/down voters, 
we should keep the flexibility here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16144: skip checkQuorum for only 1 voter case [kafka]

2024-01-18 Thread via GitHub


showuon opened a new pull request, #15235:
URL: https://github.com/apache/kafka/pull/15235

   When there's only 1 voter, there will be no fetch request from other voters. 
In this case, we should still not expire the checkQuorum timer because there's 
just 1 voter.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15610: Fix `CoreUtils.swallow()` test gaps [kafka]

2024-01-18 Thread via GitHub


github-actions[bot] commented on PR #14583:
URL: https://github.com/apache/kafka/pull/14583#issuecomment-1899638097

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16144) Controller leader checkQuorum timer should skip only 1 controller case

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-16144:
-

Assignee: Luke Chen

> Controller leader checkQuorum timer should skip only 1 controller case
> --
>
> Key: KAFKA-16144
> URL: https://issues.apache.org/jira/browse/KAFKA-16144
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>  Labels: newbie, newbie++
>
> In KAFKA-15489, we fixed the potential "split brain" issue by adding the 
> check quorum timer. This timer will be updated when the follower fetch 
> request arrived. And it expires the timer when the there are no majority of 
> voter followers fetch from leader, and resign the leadership. 
> But in KAFKA-15489, we forgot to consider the case where there's only 1 
> controller node. If there's only 1 controller node (and no broker node), 
> there will be no fetch request arrived, so the timer will expire each time. 
> However, if there's only 1 node, we don't have to care about the "check 
> quorum" at all. We should skip the check for only 1 controller node case.
> Currently, this issue will happen only when there's 1 controller node and no 
> any broker node (i.e. no fetch request sent to the controller). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode

2024-01-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16163.
---
Resolution: Duplicate

> Constant resignation/reelection of controller when starting a single node in 
> combined mode
> --
>
> Key: KAFKA-16163
> URL: https://issues.apache.org/jira/browse/KAFKA-16163
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Major
>
> When starting a single node in combined mode:
> {noformat}
> $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> $ bin/kafka-server-start.sh config/kraft/server.properties{noformat}
>  
> it's constantly spamming the logs with:
> {noformat}
> [2024-01-18 17:37:09,065] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch 
> request from the majority of the voters within 3000ms. Current fetched voters 
> are []. (org.apache.kafka.raft.LeaderState)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to 
> ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, 
> unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, 
> epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> voterStates={1=ReplicaState(nodeId=1, 
> endOffset=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, 
> hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState)
> [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,072] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,123] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,124] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,175] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,176] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,227] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,229] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,279] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread){noformat}
> This did not happen in 3.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-01-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16111:
--
Description: 
There is justified concern that the new threading model may not play well with 
"tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide some 
assurance that it will support complicated patterns.
 # Design and implement test scenarios
 # Update and document any design changes with the callback sub-system where 
needed
 # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
said design

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15217:
URL: https://github.com/apache/kafka/pull/15217#issuecomment-1899524211

   Jenkins looks good, but previous system tests run did not start to run the 
test (I assume also because of the compilation error). Retriggered: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6039/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15228:
URL: https://github.com/apache/kafka/pull/15228#issuecomment-1899520079

   Thanks @apoorvmittal10!
   
   Merged to `trunk` and cherry-picked to `3.7` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


mjsax merged PR #15228:
URL: https://github.com/apache/kafka/pull/15228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-01-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16111:
-

Assignee: Lucas Brutschy  (was: Kirk True)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144631


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   We also have one for the GroupCoordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144452


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   We have the test above in this file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143981


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   But yes, we simply pass through concurrent txns which will be fatal to the 
client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143630


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   We have separate handling for produce requests and txn offset commit 
requests.
   
   for produce:
   
   ```
 case Errors.INVALID_TXN_STATE => 
Some(error.exception("Partition was not added to the transaction"))
 case Errors.CONCURRENT_TRANSACTIONS |
  Errors.COORDINATOR_LOAD_IN_PROGRESS |
  Errors.COORDINATOR_NOT_AVAILABLE |
  Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
   s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
 case _ => None
```
   
   
   for txn offset commit: 
   
   ```
   error match {
 case Errors.UNKNOWN_TOPIC_OR_PARTITION
  | Errors.NOT_ENOUGH_REPLICAS
  | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
   Errors.COORDINATOR_NOT_AVAILABLE
   
 case Errors.NOT_LEADER_OR_FOLLOWER
  | Errors.KAFKA_STORAGE_ERROR =>
   Errors.NOT_COORDINATOR
   
 case Errors.MESSAGE_TOO_LARGE
  | Errors.RECORD_LIST_TOO_LARGE
  | Errors.INVALID_FETCH_SIZE =>
   Errors.INVALID_COMMIT_OFFSET_SIZE
   
 // We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to 
transaction verification.
 // They can be returned without mapping to a new error.
 case other => other
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458141964


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 val service = getConsumerGroupService(cgcArgs)
 
 val expectedListing = Set(
-  new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+  new ConsumerGroupListing(simpleGroup, true)
+.setState(Optional.of(ConsumerGroupState.EMPTY))
+.setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+  new ConsumerGroupListing(group, false)
+.setState(Optional.of(ConsumerGroupState.STABLE))
+.setType(if (quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+)
 
 var foundListing = Set.empty[ConsumerGroupListing]
 TestUtils.waitUntilTrue(() => {
-  foundListing = 
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+  foundListing = 
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet, 
Set.empty).toSet
   expectedListing == foundListing
 }, s"Expected to show groups $expectedListing, but found $foundListing")
 
-val expectedListingStable = Set(
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+val expectedListingStable = Set.empty[ConsumerGroupListing]
 
 foundListing = Set.empty[ConsumerGroupListing]
 TestUtils.waitUntilTrue(() => {
-  foundListing = 
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+  foundListing = 
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
 Set.empty).toSet
   expectedListingStable == foundListing
 }, s"Expected to show groups $expectedListingStable, but found 
$foundListing")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testConsumerGroupStatesFromString(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String): 
Unit = {
+val simpleGroup = "simple-group"
+addSimpleGroupExecutor(group = simpleGroup)
+addConsumerGroupExecutor(numConsumers = 1)
+
+val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+val service = getConsumerGroupService(cgcArgs)
+
+val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+val expectedListing = Set(
+  new ConsumerGroupListing(simpleGroup, true)
+.setState(Optional.of(ConsumerGroupState.EMPTY))
+.setType(if(quorum.contains("kip848")) 
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),

Review Comment:
   According to the original way I wrote the code, we were gonna return empty 
for any group type while using the old GC, this will change with my new commit, 
was waiting to rebase so I can test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458133607


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 val service = getConsumerGroupService(cgcArgs)
 
 val expectedListing = Set(
-  new ConsumerGroupListing(simpleGroup, true, 
Optional.of(ConsumerGroupState.EMPTY)),
-  new ConsumerGroupListing(group, false, 
Optional.of(ConsumerGroupState.STABLE)))
+  new ConsumerGroupListing(simpleGroup, true)

Review Comment:
   yeah the changes were made due to the constructor changes to 
ConsumerGroupListing, I guess we'll change it everywhere if we decide to change 
it there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-18 Thread via GitHub


kirktrue commented on code in PR #15186:
URL: https://github.com/apache/kafka/pull/15186#discussion_r1458125932


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -376,25 +376,21 @@ protected Map 
prepareCloseFetchSessi
 final Cluster cluster = metadata.fetch();
 Map fetchable = new HashMap<>();
 
-try {
-sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-// set the session handler to notify close. This will set the 
next metadata request to send close message.
-sessionHandler.notifyClose();
+sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {

Review Comment:
   I think that's worth exploring, because I don't see that we ever remove any 
of the `FetchSessionHandler` entries once they're created 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458121585


##
core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala:
##
@@ -116,6 +123,18 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
 }
 
 object ConsumerGroupCommandTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the classic group protocol.
+  // * KRaft and the classic group protocol.
+  // * KRaft with the new group coordinator enabled and the classic group 
protocol.
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol.
+  def getTestQuorumAndGroupProtocolParametersAll: 
java.util.stream.Stream[Arguments] = {

Review Comment:
   I wasn't able to re use it so I added the code here but I can double check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-01-18 Thread Kirk True (Jira)
Kirk True created KAFKA-16167:
-

 Summary: Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
 Key: KAFKA-16167
 URL: https://issues.apache.org/jira/browse/KAFKA-16167
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458105856


##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -102,6 +103,15 @@ object ConsumerGroupCommand extends Logging {
 parsedStates
   }
 
+  def consumerGroupTypesFromString(input: String): Set[ConsumerGroupType] = {
+val parsedStates = input.split(',').map(s => 
ConsumerGroupType.parse(s.trim)).toSet

Review Comment:
   omgg thanks for catching that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-18 Thread via GitHub


lzyLuke commented on code in PR #15193:
URL: https://github.com/apache/kafka/pull/15193#discussion_r1458081035


##
docs/ops.html:
##
@@ -3992,6 +3979,75 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,
+select the final migration step that you have completed in 
this table.
+  
+  
+Note that the directions given here assume that each step was fully 
completed, and they were
+done in order. So, for example, we assume that if "Enabling the migration 
on the brokers" was completed,
+"Provisioning the KRaft controller quorum" was also fully completed 
previously.
+  
+  
+If you did not fully complete any step, back out whatever you have done 
and then follow revert
+directions for the last fully completed step.
+  
+
+  
+  
+  
+Final Migration Section Completed
+Directions for Reverting
+Notes
+  
+  
+Preparing for migration
+The prepartion section does not involve leaving ZK mode. So there 
is nothing to do in
+the case of a revert.
+
+  
+  
+Provisioning the KRaft controller quorum
+Deprovision the KRaft controller quorum, and then you are 
done.
+
+  
+  
+Enabling zookeeper.metadata.migration.enable on the brokers
+Roll the broker cluster with 
zookeeper.metadata.migration.enable=false. Then,
+deprovision the KRaft controller quorum. Then you are done.
+
+  
+  
+Migrating brokers to KRaft
+
+Roll the broker cluster with the process.roles configuration 
omitted, node.id
+replaced with broker.id, and the zookeeper.connect configuration 
set to a valid
+value.

Review Comment:
   qq: what is the valid value for zookeeper.connect ?



##
docs/ops.html:
##
@@ -3992,6 +3979,75 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,

Review Comment:
   Should we explain on which phase we could revert to ZK mode? Or at least say 
after the first roll?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-18 Thread via GitHub


dongnuo123 commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1458085202


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -856,7 +899,8 @@ public boolean cleanupExpiredOffsets(String groupId, 
List records) {
 });
 metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size());
 
-return allOffsetsExpired.get();
+// We don't want to remove the group if there are ongoing transactions.
+return allOffsetsExpired.get() && 
!openTransactionsByGroup.containsKey(groupId);

Review Comment:
   Just want to understand here, if there are ongoing transactions, 
`hasPendingTransactionalOffsets(groupId, topic, partition)` will be true and 
`allOffsetsExpired` will be set to false in L888. Why 
`!openTransactionsByGroup.containsKey(groupId)` is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458083463


##
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends 
AbstractOptions states = Collections.emptySet();
 
+private Set groupTypes = Collections.emptySet();
+
 /**
- * If states is set, only groups in these states will be returned by 
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by 
listConsumerGroups().
  * Otherwise, all groups are returned.
  * This operation is supported by brokers with version 2.6.0 or later.
  */
 public ListConsumerGroupsOptions inStates(Set states) {
-this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+this.states = (states == null || states.isEmpty()) ? 
Collections.emptySet() : states;
+return this;
+}
+
+/**
+ * If groupTypes is set, only groups of these groupTypes will be returned 
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *
+ */
+public ListConsumerGroupsOptions inTypes(Set 
groupTypes) {

Review Comment:
   Ohh thanks yess I'll change it, sry was just following the method names we 
used with States for consistency but inTypes doesn't make sense like inStates 
does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458079066


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2229,7 +2229,7 @@ void handleResponse(AbstractResponse abstractResponse) {
 
 String topicName = cluster.topicName(topicId);
 if (topicName == null) {
-future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+future.completeExceptionally(new 
InvalidTopicException("TopicId " + topicId + " not found."));

Review Comment:
   Thanks for the catch, missed this when I was splitting the PR and merging!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458077276


##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -21,95 +21,111 @@
 import java.util.Optional;

Review Comment:
   Ohh hmm that makes sense thanks for bringing this up! I do feel like the way 
it exists right now is kinda inefficient that's why I changed it. Out of 
curiosity if I wanted to make these changes to the file what would we have to 
do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458069279


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -104,9 +165,27 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): 
Unit = {
+var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+assertEquals(Set(ConsumerGroupType.CONSUMER), result)
+
+result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+assertEquals(Set(ConsumerGroupType.CONSUMER, ConsumerGroupType.CLASSIC), 
result)
+
+assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("Consumer"))

Review Comment:
   yes, It will work once we get the case insensitive code in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458068041


##
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {
+UNKNOWN("unknown"),
+CONSUMER("consumer"),
+CLASSIC("classic");
+
+private final static Map NAME_TO_ENUM = 
Arrays.stream(values())
+.collect(Collectors.toMap(type -> type.name, Function.identity()));
+
+private final String name;
+
+ConsumerGroupType(String name) {
+this.name = name;
+}
+
+/**
+ * Parse a string into a consumer group type.
+ */
+public static ConsumerGroupType parse(String name) {

Review Comment:
   Yes I have all the changes locally, waiting to rebase the API changes so I 
can run some tests all together



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1458067667


##
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {

Review Comment:
   Okie I can change it, I was following precedent with ConsumerGroupState, 
trying to keep everything consistent
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458066834


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel,
   val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
   // call the replica manager to append messages to the replicas
-  replicaManager.appendRecords(
+  replicaManager.handleProduceAppend(
 timeout = produceRequest.timeout.toLong,
 requiredAcks = produceRequest.acks,
 internalTopicsAllowed = internalTopicsAllowed,
 origin = AppendOrigin.CLIENT,

Review Comment:
   that's fair. I can do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch(
 ConsumerGroupMember oldMember
 ) {
 if (oldMember != null) {
-removePartitionEpochs(oldMember.assignedPartitions());
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
+removePartitionEpochs(oldMember.assignedPartitions(), 
oldMember.memberEpoch());
+removePartitionEpochs(oldMember.partitionsPendingRevocation(), 
oldMember.memberEpoch());
 }
 }
 
 /**
  * Removes the partition epochs based on the provided assignment.
  *
  * @param assignmentThe assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * package-private for testing.
  */
-private void removePartitionEpochs(
-Map> assignment
+void removePartitionEpochs(
+Map> assignment,
+int expectedEpoch
 ) {
 assignment.forEach((topicId, assignedPartitions) -> {
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
-assignedPartitions.forEach(partitionsOrNull::remove);
+assignedPartitions.forEach(partitionId -> {
+Integer prevValue = 
partitionsOrNull.remove(partitionId);
+if (prevValue != expectedEpoch) {
+throw new IllegalStateException(
+String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
+"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+}
+});
 if (partitionsOrNull.isEmpty()) {
 return null;
 } else {
 return partitionsOrNull;
 }
 } else {
-return null;
+throw new IllegalStateException(

Review Comment:
   What is the effect of throwing this error? Do we also block removing the 
rest of the partitions?
   Just trying to figure out the state we are left in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch(
 ConsumerGroupMember oldMember
 ) {
 if (oldMember != null) {
-removePartitionEpochs(oldMember.assignedPartitions());
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
+removePartitionEpochs(oldMember.assignedPartitions(), 
oldMember.memberEpoch());
+removePartitionEpochs(oldMember.partitionsPendingRevocation(), 
oldMember.memberEpoch());
 }
 }
 
 /**
  * Removes the partition epochs based on the provided assignment.
  *
  * @param assignmentThe assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * package-private for testing.
  */
-private void removePartitionEpochs(
-Map> assignment
+void removePartitionEpochs(
+Map> assignment,
+int expectedEpoch
 ) {
 assignment.forEach((topicId, assignedPartitions) -> {
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
-assignedPartitions.forEach(partitionsOrNull::remove);
+assignedPartitions.forEach(partitionId -> {
+Integer prevValue = 
partitionsOrNull.remove(partitionId);
+if (prevValue != expectedEpoch) {
+throw new IllegalStateException(
+String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
+"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+}
+});
 if (partitionsOrNull.isEmpty()) {
 return null;
 } else {
 return partitionsOrNull;
 }
 } else {
-return null;
+throw new IllegalStateException(

Review Comment:
   What is the effect of throwing this error? Do we also block removing the 
rest of the partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1458064289


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -92,30 +117,34 @@ public boolean equals(Object o) {
 
 OffsetAndMetadata that = (OffsetAndMetadata) o;
 
-if (offset != that.offset) return false;
+if (committedOffset != that.committedOffset) return false;
 if (commitTimestampMs != that.commitTimestampMs) return false;
-if (!leaderEpoch.equals(that.leaderEpoch)) return false;
-if (!metadata.equals(that.metadata)) return false;
-return expireTimestampMs.equals(that.expireTimestampMs);
+if (recordOffset != that.recordOffset) return false;
+if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;

Review Comment:
   I see. Is this using AI  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1458061363


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9616,47 +9618,76 @@ public void testListGroups() {
 .build()));
 context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
 
+// Test list group response without a group state or group type filter.
 Map actualAllGroupMap =
-context.sendListGroups(Collections.emptyList())
-
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+context.sendListGroups(Collections.emptyList(), 
Collections.emptyList()).stream()
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
 Map expectAllGroupMap =
 Stream.of(
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(classicGroup.groupId())
-.setProtocolType(classicGroupType)
-.setGroupState(EMPTY.toString()),
+.setProtocolType("classic")
+.setGroupState(EMPTY.toString())
+.setGroupType(Group.GroupType.CLASSIC.toString()),
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(consumerGroupId)
 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
 
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
+.setGroupType(Group.GroupType.CONSUMER.toString())
 
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
 assertEquals(expectAllGroupMap, actualAllGroupMap);
 
 context.commit();
-actualAllGroupMap = 
context.sendListGroups(Collections.emptyList()).stream()
+
+// Test list group response to check assigning state in the consumer 
group.
+actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("assigning"), 
Collections.emptyList()).stream()
 
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 expectAllGroupMap =
 Stream.of(
-new ListGroupsResponseData.ListedGroup()
-.setGroupId(classicGroup.groupId())
-.setProtocolType(classicGroupType)
-.setGroupState(EMPTY.toString()),
 new ListGroupsResponseData.ListedGroup()
 .setGroupId(consumerGroupId)
 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
 
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
+.setGroupType(Group.GroupType.CONSUMER.toString())
 
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
 
 assertEquals(expectAllGroupMap, actualAllGroupMap);
 
-actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty")).stream()
+// Test list group response with group state filter and no group type 
filter.
+actualAllGroupMap = 
context.sendListGroups(Collections.singletonList("Empty"), 
Collections.emptyList()).stream()
+
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+expectAllGroupMap = Stream.of(
+new ListGroupsResponseData.ListedGroup()
+.setGroupId(classicGroup.groupId())
+.setProtocolType("classic")
+.setGroupState(EMPTY.toString())
+.setGroupType(Group.GroupType.CLASSIC.toString())
+
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, 
Function.identity()));
+
+assertEquals(expectAllGroupMap, actualAllGroupMap);
+
+// Test list group response with no group state filter and with group 
type filter.
+actualAllGroupMap = context.sendListGroups(Collections.emptyList(), 
Collections.singletonList(Group.GroupType.CLASSIC.toString())).stream()

Review Comment:
   Okie I added the case insensitive case to be of Consumer type
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-18 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1458058459


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,19 +453,36 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+
+// The type check is case-insensitive.
+boolean typeCheck = typesFilter.isEmpty() ||
+typesFilter.stream()
+.map(String::toLowerCase)

Review Comment:
   oya that makes sense, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on PR #15190:
URL: https://github.com/apache/kafka/pull/15190#issuecomment-1899343398

   > @apoorvmittal10 : Thanks for the PR. A couple of minor comments.
   
   Thanks for reviewing @junrao. I have addressed and replied to the comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15190:
URL: https://github.com/apache/kafka/pull/15190#discussion_r1458055273


##
server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java:
##
@@ -59,6 +60,7 @@ public static RequestContext requestContext() throws 
UnknownHostException {
 new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, 
"producer-1", 0),
 "1",
 InetAddress.getLocalHost(),
+Optional.of(56078),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 opened a new pull request, #15234:
URL: https://github.com/apache/kafka/pull/15234

   KIP-714 requires client instance cache in broker which should also have a 
time-based eviction policy where client instances which are not actively 
sending metrics should be evicted. KIP mentions `This client instance specific 
state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) 
milliseconds`
   
   The PR adds support to evict such instances from the cache. There could be 
multiple approaches for the eviction i.e.
   1. Periodically iterating on all entries in the cache as every instance can 
have different TTL (based on configured push interval)
   2. Using heap to store instance eviction - using delay queues
   3. Using hashed wheel timer which updates and evicts entries in O(1) time 
(minor overhead of bucketed delay queues)
   
   I have also moved a wrapper class for SystemTimer from `grou-coordinator` to 
`utils`.
   
   cc: @AndrewJSchofield @junrao
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1458050364


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName,
+startIndex: Int,
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => (None, -1)
+  case Some(topic) => {
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val partitions = topic.partitions().keySet()
+val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+for (partitionId <- startIndex until upperIndex) {
+  topic.partitions().get(partitionId) match {
+case partition : PartitionRegistration => {
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+case Some(leader) =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+  }
+}
+case _ =>

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808378#comment-17808378
 ] 

Proven Provenzano commented on KAFKA-16162:
---

I have a PR [GitHub Pull Request 
#15232|https://github.com/apache/kafka/pull/15232] that should fix the issue. I 
have not created tests for it yet though and it really should also add a 
condition where if a broker sends a heartbeat indicating it has no online 
directories, it should be fenced.

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG 

[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-16162:
--
Affects Version/s: 3.7.0

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Blocker
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
> partitionEpoch: 0 -> 1 

[jira] [Updated] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-01-18 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-16162:
--
Priority: Blocker  (was: Major)

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Blocker
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [AA, 7K5JBERyyqFFxIXSXYluJA, AA], 
> partitionEpoch: 0 -> 1 

[PR] KAFKA-16166: Generify RetryWithToleranceOperator and ErrorReporter classes [kafka]

2024-01-18 Thread via GitHub


gharris1727 opened a new pull request, #15233:
URL: https://github.com/apache/kafka/pull/15233

   This is a follow-up to #15154 which propagates the generic type for 
ProcessingContext upward through all its call-sites.
   
   I decided to avoid generifying the WorkerTask and instead separate the 
common retryWithToleranceOperator field into two fields with different types, 
in WorkerSinkTask and AbstractWorkerSourceTask, as the only operation which was 
present in the WorkerTask was the retryWithToleranceOperator.triggerStop() call.
   
   This now makes the type system enforce that the DeadLetterQueueReporter only 
works for sink tasks, and the LogReporter must be specialized as either 
LogReporter.Sink or LogReporter.Source.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16162: [WIP] [kafka]

2024-01-18 Thread via GitHub


pprovenzano opened a new pull request, #15232:
URL: https://github.com/apache/kafka/pull/15232

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-18 Thread via GitHub


cmccabe commented on PR #15193:
URL: https://github.com/apache/kafka/pull/15193#issuecomment-1899239682

   I put this into a table in hopes that it will make it clearer. cc @mumrah 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16166) Generify RetryWithToleranceOperator and ErrorReporter

2024-01-18 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16166:
---

 Summary: Generify RetryWithToleranceOperator and ErrorReporter
 Key: KAFKA-16166
 URL: https://issues.apache.org/jira/browse/KAFKA-16166
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Greg Harris
Assignee: Greg Harris


The RetryWithToleranceOperator and ErrorReporter instances in connect are only 
ever used with a single type of ProcessingContext 
(ProcessingContext for sources, 
ProcessingContext> for sinks) and currently 
dynamically decide between these with instanceof checks.

Instead, these classes should be generic, and have their implementations accept 
consistent ProcessingContext objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16087) Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race

2024-01-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16087.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Tasks dropping incorrect records when errors.tolerance=all and errors 
> reported asynchronously due to data race
> --
>
> Key: KAFKA-16087
> URL: https://issues.apache.org/jira/browse/KAFKA-16087
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0, 3.2.0, 3.7.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.8.0
>
>
> The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors 
> to push records to the connector DLQ topic. The implementation of this 
> reporter interacts with the ProcessingContext within the per-task 
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
> the current operation, such as what error has occurred or what record is 
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the 
> converter and transformation pipeline of the connector for similar reasons. 
> When the ErrantRecordReporter#report function is called from SinkTask#put, 
> there is no contention over the mutable state, as the thread used for 
> SinkTask#put is also responsible for converting and transforming the record. 
> However, if ErrantRecordReporter#report is called by an extra thread within 
> the SinkTask, there is thread contention on the single mutable 
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
> synchronized keyword was added to all methods of RetryWithToleranceOperator 
> which interact with the ProcessingContext. However, this solution still 
> allows the RWTO methods to interleave, and produce unintended data races. 
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call 
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation. 
> For example, [converting 
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error == 
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
> [RWTO 
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
>  and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently 
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO 
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
>  which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded 
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the 
> error reporter for processing, despite having produced no error during 
> processing.
> This per-method synchronization for returning nulls and errors separately is 
> insufficient, and either the data sharing should be avoided or a different 
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported 
> by the producer, and was introduced in KIP-779 (3.2.0)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-18 Thread via GitHub


gharris1727 merged PR #15154:
URL: https://github.com/apache/kafka/pull/15154


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-18 Thread via GitHub


gharris1727 commented on PR #15154:
URL: https://github.com/apache/kafka/pull/15154#issuecomment-1899225769

   Test failures appear unrelated, and connect:runtime passes for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457965643


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);
+log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   I don't think it will make much difference, seems fine to me. Writing the 
suggestion will involve changing `msg` in each block. I think it's fine the way 
it is :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457964560


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -379,6 +379,7 @@ public Optional> createRequest() 
{
 lock.readLock().unlock();
 }
 
+log.debug("Creating telemetry request. Telemetry state: {}", 
localState);

Review Comment:
   Hmmm, I don't have an opinion there, either seems fine to me. Your wish is 
my command :). Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-18 Thread via GitHub


jeffkbkim commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1457909168


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -685,6 +712,20 @@ private boolean hasPendingTransactionalOffsets(
 return false;
 }
 
+/**
+ * @return true iff there is a committed offset in the main offset store 
for the
+ * given group, topic and partition.
+ *
+ * Package private for testing.
+ */
+boolean hadCommittedOffset(

Review Comment:
   nit: should this be hasCommittedOffset?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -982,12 +1040,15 @@ public void replayEndTransactionMarker(
 log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
 "with topic {}, partition {}, and offset {}.",
 producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-offsets.put(
+OffsetAndMetadata previousValue = offsets.put(
 groupId,
 topicName,
 partitionId,
 offsetAndMetadata
 );
+if (previousValue == null) {
+metrics.incrementNumOffsets();
+}

Review Comment:
   thanks for adding this!



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -604,22 +602,23 @@ public CoordinatorResult deleteOffsets(
 )
 );
 } else {
-final TimelineHashMap 
offsetsByPartition = offsetsByTopic == null ?
-null : offsetsByTopic.get(topic.name());
-if (offsetsByPartition != null) {
-topic.partitions().forEach(partition -> {
-if 
(offsetsByPartition.containsKey(partition.partitionIndex())) {
-responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
-.setPartitionIndex(partition.partitionIndex())
-);
-
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
-request.groupId(),
-topic.name(),
-partition.partitionIndex()
-));
-}
-});
-}
+topic.partitions().forEach(partition -> {
+// We always add the partition to the response.
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+);
+
+// A tombstone is written if an offset is present is the 
main storage or

Review Comment:
   nit: "is present in"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-01-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15843:
---
Description: 
Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
is not the case when triggering onPartitionsRevoked or Lost). This is the 
behaviour of the legacy coordinator, and the new consumer implementation 
maintains the same principle. We should review this to fully understand if it 
is really needed to call onPartitionsAssigned with empty assignment (or if it 
should behave consistently with the onRevoke/Lost).

Note that the consumer integration tests rely on this call to 
onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)

  was:Legacy coordinator triggers onPartitionsAssigned with empty assignment 
(which is not the case when triggering onPartitionsRevoked or Lost). This is 
the behaviour of the legacy coordinator, and the new consumer implementation 
maintains the same principle. We should review this to fully understand if it 
is really needed to call onPartitionsAssigned with empty assignment (or if it 
should behave consistently with the onRevoke/Lost)


> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost).
> Note that the consumer integration tests rely on this call to 
> onPartitionsAssigned to #awaitRebalance (AbstractConsumerTest.scala)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457945378


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);
+log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   so that we can remove the "returning the value 224678 ms" - it was a bit 
hard to understand what it actually mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457944304


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,7 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);
+log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   Do you think it makes sense to rephrase the log? For the `msg` can we 
explicitly say:
   
   ...client will wait for {}ms before submitting the next...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Correcting logs to debug telemetry reporter [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457941137


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -379,6 +379,7 @@ public Optional> createRequest() 
{
 lock.readLock().unlock();
 }
 
+log.debug("Creating telemetry request. Telemetry state: {}", 
localState);

Review Comment:
   it would be useful to know the type of the request client is sending: either 
it is a subscriptionRequest or push request.  I guess we can derive it from the 
current state, but state is rather an internal thing so that the person looking 
at it might not know what it means. my suggestion is to be explicit about "we 
are sending a push request" or "we are sending a subscription request".  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-01-18 Thread via GitHub


mjsax commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1457939699


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -146,7 +146,7 @@ public static  QueryResult handleBasicQueries(
 "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
 );
 }
-result.setPosition(position);
+result.setPosition(position.copy());

Review Comment:
   Thanks. I was hoping it would be thready-safe already -- maybe not. Let me 
double check the code and figure it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1457923250


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName,
+startIndex: Int,
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => (None, -1)
+  case Some(topic) => {
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val partitions = topic.partitions().keySet()
+val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+for (partitionId <- startIndex until upperIndex) {
+  topic.partitions().get(partitionId) match {
+case partition : PartitionRegistration => {
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+case Some(leader) =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+  }
+}
+case _ =>

Review Comment:
   Definitely should log an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457917435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -875,30 +875,43 @@ private void maybeRemovePartitionEpoch(
 ConsumerGroupMember oldMember
 ) {
 if (oldMember != null) {
-removePartitionEpochs(oldMember.assignedPartitions());
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
+removePartitionEpochs(oldMember.assignedPartitions(), 
oldMember.memberEpoch());
+removePartitionEpochs(oldMember.partitionsPendingRevocation(), 
oldMember.memberEpoch());
 }
 }
 
 /**
  * Removes the partition epochs based on the provided assignment.
  *
  * @param assignmentThe assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected 
one.
+ * package-private for testing.
  */
-private void removePartitionEpochs(
-Map> assignment
+void removePartitionEpochs(
+Map> assignment,
+int expectedEpoch
 ) {
 assignment.forEach((topicId, assignedPartitions) -> {
 currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
 if (partitionsOrNull != null) {
-assignedPartitions.forEach(partitionsOrNull::remove);
+assignedPartitions.forEach(partitionId -> {
+Integer prevValue = 
partitionsOrNull.remove(partitionId);
+if (prevValue != expectedEpoch) {
+throw new IllegalStateException(
+String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
+"still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+}
+});
 if (partitionsOrNull.isEmpty()) {
 return null;
 } else {
 return partitionsOrNull;
 }
 } else {
-return null;
+throw new IllegalStateException(

Review Comment:
   What is the affect of throwing this error? Do we also block removing the 
rest of the partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16164: Pre-Vote RPCs [part 1] [kafka]

2024-01-18 Thread via GitHub


ahuang98 opened a new pull request, #15231:
URL: https://github.com/apache/kafka/pull/15231

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1457913561


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-if (oldMember == null) {
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-} else {
-if 
(!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) {
-removePartitionEpochs(oldMember.assignedPartitions());
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-}
-if 
(!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation()))
 {
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-}
-}
+maybeRemovePartitionEpoch(oldMember);

Review Comment:
   That was the only thing I could think of as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-18 Thread via GitHub


divijvaidya commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1899108113

   > @divijvaidya , since Satish is busy, could you help review this PR? We'd 
like to get it into v3.7.0 for the completion of KIP-963. Thanks.
   
   Sorry I have been busy with work lately. Will look at this first thing 
tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-18 Thread via GitHub


mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1457908869


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+   * index that is not included in the result.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
+   */
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName,
+startIndex: Int,
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => (None, -1)
+  case Some(topic) => {
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val partitions = topic.partitions().keySet()
+val upperIndex = topic.partitions().size().min(startIndex + maxCount)
+val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
+for (partitionId <- startIndex until upperIndex) {
+  topic.partitions().get(partitionId) match {
+case partition : PartitionRegistration => {
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+case Some(leader) =>
+  result.append(new DescribeTopicPartitionsResponsePartition()
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+  }
+}
+case _ =>

Review Comment:
   Should we throw an ISE here rather than silently continue? Maybe we could 
just log an error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15807) Add support for compression/decompression of metrics

2024-01-18 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-15807.
---
Resolution: Done

> Add support for compression/decompression of metrics
> 
>
> Key: KAFKA-15807
> URL: https://issues.apache.org/jira/browse/KAFKA-15807
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2024-01-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-5863:
---
Fix Version/s: 3.8.0

> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Ted Yu
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-18 Thread via GitHub


gharris1727 merged PR #13294:
URL: https://github.com/apache/kafka/pull/13294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15190:
URL: https://github.com/apache/kafka/pull/15190#discussion_r145796


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1140,9 +1141,9 @@ private[kafka] class Processor(
 expiredConnectionsKilledCount.record(null, 1, 0)
   } else {
 val connectionId = receive.source
-val context = new RequestContext(header, connectionId, 
channel.socketAddress,
-  channel.principal, listenerName, securityProtocol,
-  channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener, channel.principalSerde)
+val context = new RequestContext(header, connectionId, 
channel.socketAddress, Optional.of(channel.socketPort()),

Review Comment:
   I find RequestContext is either created in SocketServer or during forwarding 
requests. We currently don't require client port information outside KIP-714, 
push telemetry request is not marked forwardable, wiring the client port 
information elsewhere seems not useful at this point of time hence I marked it 
as Optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-01-18 Thread via GitHub


vvcephei commented on code in PR #15219:
URL: https://github.com/apache/kafka/pull/15219#discussion_r1457894664


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -146,7 +146,7 @@ public static  QueryResult handleBasicQueries(
 "Handled in " + store.getClass() + " in " + (System.nanoTime() 
- start) + "ns"
 );
 }
-result.setPosition(position);
+result.setPosition(position.copy());

Review Comment:
   Better yet, we could make the copy at the beginning of this method or even 
on the caller side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-18 Thread via GitHub


gharris1727 commented on PR #13294:
URL: https://github.com/apache/kafka/pull/13294#issuecomment-1899079016

   Test failures appear unrelated, and the connect and mirror tests pass 
locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on PR #15228:
URL: https://github.com/apache/kafka/pull/15228#issuecomment-1899066505

   > Thanks @apoorvmittal10 - Could you elaborate on the purpose of this log 
line? how important is it for the user to know about the "next update time"? I 
wonder if we could just log it when update is happening.
   
   @philipnee The purpose of the log line was to debug if reporter is actually 
working and what's the time remaining while testing (was helpful in 
development). Rather than completely removing the line, I have moved it to 
trace so there is still some way for debugging later.
   
   I have added another debug log line which will only be logged when a 
telemetry request is created in accordance with push interval time interval so 
that ll be minimal and will still let developer an idea if telemetry is working 
and in which state (get subscription or push telemetry).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-01-18 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16165:
--

 Summary: Consumer invalid transition on expired poll interval
 Key: KAFKA-16165
 URL: https://issues.apache.org/jira/browse/KAFKA-16165
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16159: MINOR - Removed debug log [kafka]

2024-01-18 Thread via GitHub


apoorvmittal10 commented on code in PR #15228:
URL: https://github.com/apache/kafka/pull/15228#discussion_r1457875345


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -362,7 +362,6 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.debug("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   I think emitting metrics would have been helpful if we want to derive some 
meaningful information about the running reporter, but here the log was helpful 
to see if reporter is working correctly while testing/debugging. Rather than 
removing it, I moved it to trace so still there is a way to know what's the 
state of reporter if we need to debug application. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-18 Thread via GitHub


cmccabe opened a new pull request, #15230:
URL: https://github.com/apache/kafka/pull/15230

   When a broker is down, and a topic is deleted, this will result in that 
broker seeing "stray replicas" the next time it starts up. These replicas 
contain data that used to be important, but which now needs to be deleted. 
Stray replica deletion is handled during the initial metadata publishing step 
on the broker.
   
   Previously, we deleted these stray replicas after starting up BOTH 
LogManager and ReplicaManager. However, this wasn't quite correct. The presence 
of the stray replicas confused ReplicaManager. Instead, we should delete the 
stray replicas BEFORE starting ReplicaManager.
   
   This bug triggered when a topic was deleted and re-created while a broker 
was down, and some of the replicas of the re-created topic landed on that 
broker. The impact was that the stray replicas were deleted, but the new 
replicas for the next iteration of the topic never got created. This, in turn, 
led to persistent under-replication until the next time the broker was 
restarted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


OmniaGM commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457856799


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   it will be very small one which I don't believe it worth it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457832081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -712,13 +712,14 @@ public void run() {
 try {
 // Apply the records to the state machine.
 if (result.replayRecords()) {
-result.records().forEach(record ->
+for (int i = 0; i < result.records().size(); 
i++) {
 context.coordinator.replay(
+prevLastWrittenOffset + i,

Review Comment:
   could we maybe leave a comment about that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [7/N] Always materialize the most recent committed offset [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457831330


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -918,7 +920,7 @@ public void replay(
 groupId,
 topic,
 partition,
-OffsetAndMetadata.fromRecord(value)
+OffsetAndMetadata.fromRecord(offset, value)

Review Comment:
   I wonder if that is easy to get confused. And if there is a way to make it 
easier to known what the parameters mean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1898982326

   `trunk` was broken -- rebased. Also re-triggered system tests just to be 
sure: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6038/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode

2024-01-18 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808336#comment-17808336
 ] 

Mickael Maison commented on KAFKA-16163:


It looks like this behavior was introduced in 
https://github.com/apache/kafka/commit/37416e1aebae33d01d5059ba906ec8e0e1107284

> Constant resignation/reelection of controller when starting a single node in 
> combined mode
> --
>
> Key: KAFKA-16163
> URL: https://issues.apache.org/jira/browse/KAFKA-16163
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Major
>
> When starting a single node in combined mode:
> {noformat}
> $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> $ bin/kafka-server-start.sh config/kraft/server.properties{noformat}
>  
> it's constantly spamming the logs with:
> {noformat}
> [2024-01-18 17:37:09,065] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch 
> request from the majority of the voters within 3000ms. Current fetched voters 
> are []. (org.apache.kafka.raft.LeaderState)
> [2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to 
> ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, 
> unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, 
> epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> voterStates={1=ReplicaState(nodeId=1, 
> endOffset=Optional[LogOffsetMetadata(offset=835, 
> metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
> lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, 
> hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState)
> [2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,072] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,123] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,124] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,175] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,176] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,227] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 
> name=heartbeat] Client requested disconnect from node 1 
> (org.apache.kafka.clients.NetworkClient)
> [2024-01-18 17:37:13,229] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2024-01-18 17:37:13,279] INFO 
> [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
> from now on will use node localhost:9093 (id: 1 rack: null) 
> (kafka.server.NodeToControllerRequestThread){noformat}
> This did not happen in 3.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16164) Pre-Vote

2024-01-18 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16164:


 Summary: Pre-Vote
 Key: KAFKA-16164
 URL: https://issues.apache.org/jira/browse/KAFKA-16164
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang


Implementing pre-vote as described in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16163) Constant resignation/reelection of controller when starting a single node in combined mode

2024-01-18 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16163:
--

 Summary: Constant resignation/reelection of controller when 
starting a single node in combined mode
 Key: KAFKA-16163
 URL: https://issues.apache.org/jira/browse/KAFKA-16163
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Mickael Maison


When starting a single node in combined mode:
{noformat}
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties
$ bin/kafka-server-start.sh config/kraft/server.properties{noformat}
 

it's constantly spamming the logs with:
{noformat}
[2024-01-18 17:37:09,065] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:11,967] INFO [RaftManager id=1] Did not receive fetch request 
from the majority of the voters within 3000ms. Current fetched voters are []. 
(org.apache.kafka.raft.LeaderState)
[2024-01-18 17:37:11,967] INFO [RaftManager id=1] Completed transition to 
ResignedState(localId=1, epoch=138, voters=[1], electionTimeoutMs=1864, 
unackedVoters=[], preferredSuccessors=[]) from Leader(localId=1, epoch=138, 
epochStartOffset=829, highWatermark=Optional[LogOffsetMetadata(offset=835, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
voterStates={1=ReplicaState(nodeId=1, 
endOffset=Optional[LogOffsetMetadata(offset=835, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=62788)])], 
lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) 
(org.apache.kafka.raft.QuorumState)
[2024-01-18 17:37:13,072] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,072] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,123] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,124] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,124] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,175] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,176] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,176] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,227] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,229] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-01-18 17:37:13,229] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
[2024-01-18 17:37:13,279] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, 
from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread){noformat}
This did not happen in 3.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-18 Thread via GitHub


mjsax commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1898958611

   Thanks. The fix was pushed after I left my comment. Glad it's resolved.
   
   Yeah, I did expect that two overlapping PR got merged simultaneously.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16092) Queues for Kafka

2024-01-18 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16092:
-
Labels: queues-for-kafka  (was: )

> Queues for Kafka
> 
>
> Key: KAFKA-16092
> URL: https://issues.apache.org/jira/browse/KAFKA-16092
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: queues-for-kafka
>
> This Jira tracks the development of KIP-932: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]

2024-01-18 Thread via GitHub


adixitconfluent commented on PR #15224:
URL: https://github.com/apache/kafka/pull/15224#issuecomment-1898856832

   As represented by the CLI screenshot in the PR description, the tests that 
have been changed are passing. However the build is failing. Adding a 
screenshot of no new test failures from build. I can confirm the 2 tests 
`testAlterNothingNotAuthorized` and
   `testAlterSomethingNotAuthorized`I have changed are not a part of failing 
tests in build. Test failures - 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15224/1/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-18 Thread via GitHub


philipnee commented on code in PR #15210:
URL: https://github.com/apache/kafka/pull/15210#discussion_r1457711259


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1547,6 +1553,29 @@ public String toString() {
 }
 }
 
+private class ConsumerCoordinatorMetrics {

Review Comment:
   I split the implementation into two classes because there's no need to pass 
the ref of this entire object to the request manager just for the commitSensor 
(see addCommitSensor method).  Instead, I think it would be a lot easier to 
pass the Metrics object to the manager and create their own sensors 
(essentially these metrics objects just hold a bunch of sensors referenced from 
Metrics).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457709089


##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Can ew introduce new module and config into separate PR?



##
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Can we introduce new module and config into separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704867


##
checkstyle/import-control.xml:
##
@@ -261,6 +261,10 @@
 
   
 
+
+  

Review Comment:
   Do we really need this empty block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-18 Thread via GitHub


nizhikov commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1457704365


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
+

Review Comment:
   Do we really need this empty line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #15218:
URL: https://github.com/apache/kafka/pull/15218#issuecomment-1898819047

   @anurag-harness This looks like an accidentally-opened PR and it copies the 
title from https://github.com/apache/kafka/pull/15215, which may lead to some 
confusion. I've closed it for now; please feel free to reopen if there are 
legitimate changes to the code base you'd like to propose.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-18 Thread via GitHub


C0urante closed pull request #15218: KAFKA-16107: Stop fetching while 
onPartitionsAssign completes 
URL: https://github.com/apache/kafka/pull/15218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] DO NOT MERGE: Isolate Connect OffsetsApiIntegrationTest [kafka]

2024-01-18 Thread via GitHub


C0urante commented on PR #15226:
URL: https://github.com/apache/kafka/pull/15226#issuecomment-1898810463

   Looks like isolating a single test suite removes the conditions that lead to 
flakiness. Closing in favor of https://github.com/apache/kafka/pull/15229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >