Re: [PR] KAFKA-15541: Add num-open-iterators metric [kafka]

2024-05-20 Thread via GitHub


mjsax commented on code in PR #15975:
URL: https://github.com/apache/kafka/pull/15975#discussion_r1607621599


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##
@@ -144,6 +145,10 @@ private StateStoreMetrics() {}
 private static final String SUPPRESSION_BUFFER_SIZE_MAX_DESCRIPTION =
 MAX_DESCRIPTION_PREFIX + SUPPRESSION_BUFFER_SIZE_DESCRIPTION;
 
+private static final String NUM_OPEN_ITERATORS = "num-open-iterators";
+private static final String NUM_OPEN_ITERATORS_DESCRIPTION =
+"The current number of Iterators on the store that have been 
created, but not yet closed";

Review Comment:
   ```suggestion
   "The current number of iterators on the store that have been 
created, but not yet closed";
   ```



##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##
@@ -162,6 +165,8 @@ private void registerMetrics() {
 flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
 deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
 e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
+StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,

Review Comment:
   Would it be better to add a `Sensor` that allows us to track the different 
metrics in one go?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-20 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1607621132


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
 return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 }
 
+/**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+public Map> partitionAssignments() {
+return Collections.unmodifiableMap(partitionAssignments);
+}
+
 /**
  * Updates target assignment of a member.
  *
  * @param memberId  The member id.
  * @param newTargetAssignment   The new target assignment.
  */
 public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+updatePartitionAssignments(
+memberId,
+targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+newTargetAssignment
+);
 targetAssignment.put(memberId, newTargetAssignment);
 }
 
+/**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId  The member Id.
+ * @param oldTargetAssignment   The old target assignment.
+ * @param newTargetAssignment   The new target assignment.
+ *
+ * Package private for testing.
+ */
+void updatePartitionAssignments(
+String memberId,
+Assignment oldTargetAssignment,
+Assignment newTargetAssignment
+) {
+// Combine keys from both old and new assignments.
+Set allTopicIds = new HashSet<>();
+allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+for (Uuid topicId : allTopicIds) {
+Set oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+Set newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+TimelineHashMap topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+);
+
+// Remove partitions that aren't present in the new assignment.
+for (Integer partition : oldPartitions) {
+if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   Imagine there are two members A,B. We remove a partition (P0) from A and 
assign it to B. If B heartbeats first and we see that it's new target 
assignment has P0 now we will update the map to 1,B. This way when A 
heartbeats, we don't remove it from the map unless it's currently assigned to A 
still.
   In the case where we just use a byte array with each index as a partition 
number, it was possible that we would update the assignment as 1,true when B 
heartbeats and then unset it to false when A heartbeats even though it is 
currently assigned to B. We need to make sure we only remove assignments if the 
current member still has ownership



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-20 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1607621132


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
 return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 }
 
+/**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+public Map> partitionAssignments() {
+return Collections.unmodifiableMap(partitionAssignments);
+}
+
 /**
  * Updates target assignment of a member.
  *
  * @param memberId  The member id.
  * @param newTargetAssignment   The new target assignment.
  */
 public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+updatePartitionAssignments(
+memberId,
+targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+newTargetAssignment
+);
 targetAssignment.put(memberId, newTargetAssignment);
 }
 
+/**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId  The member Id.
+ * @param oldTargetAssignment   The old target assignment.
+ * @param newTargetAssignment   The new target assignment.
+ *
+ * Package private for testing.
+ */
+void updatePartitionAssignments(
+String memberId,
+Assignment oldTargetAssignment,
+Assignment newTargetAssignment
+) {
+// Combine keys from both old and new assignments.
+Set allTopicIds = new HashSet<>();
+allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+for (Uuid topicId : allTopicIds) {
+Set oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+Set newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+TimelineHashMap topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+);
+
+// Remove partitions that aren't present in the new assignment.
+for (Integer partition : oldPartitions) {
+if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   Imagine there are two members A,B. We remove a partition (P0) from A and 
assign it to B. If B heartbeats first and we see that it's new target 
assignment has P0 now we will update the map to 1,B. This way when A heartbeats 
and we don't remove it from the map unless it's currently assigned to A still.
   In the case where we just use a byte array it was possible that we update 
the assignment as 1,true and then unset it when A heartbeats even though it is 
currently assigned to B now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-20 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2121704898

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14934: KafkaClusterTestKit makes FaultHandler accessible [kafka]

2024-05-20 Thread via GitHub


ahmedryasser opened a new pull request, #16012:
URL: https://github.com/apache/kafka/pull/16012

   We wrote a function that provide a way to access the MockFaultHandler 
instances for both fatal and non-fatal faults in the KafkaClusterTestKit class.
   
   We created a KafkaClusterKit instance with one broker node and one 
controller node then we call the 2 functions that we created and assert that 
they don't return null which means they've been exposed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16619: add log for de-novo KRaft cluster [kafka]

2024-05-20 Thread via GitHub


jayim243 opened a new pull request, #16011:
URL: https://github.com/apache/kafka/pull/16011

   Add a log for de-novo KRaft  cluster when 
zookeeper.metadata.migration.enable=false
   
   Modified test cases for this change.
   
   This contribution is my original work and I license the work to the project 
under the project's open source license.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-05-20 Thread via GitHub


github-actions[bot] commented on PR #15067:
URL: https://github.com/apache/kafka/pull/15067#issuecomment-2121659770

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16760.
---
Resolution: Not A Problem

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848032#comment-17848032
 ] 

Luke Chen commented on KAFKA-16760:
---

OK, so it looks like this is the expected behavior. I'll close this ticket 
then. Thank you.

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-05-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848031#comment-17848031
 ] 

Luke Chen commented on KAFKA-16414:
---

I agree with [~jeqo] . I think the goal we'd like to reach is to be consistency 
for retention.ms/bytes. So, if we think making small pieces of segments is a 
big drawback to include active segment, should we not include active segment 
for both ms and bytes configs? Do you see any side effect if we choose this 
option?

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16711: make sure update highestOffsetInRemoteStorage after log dir change [kafka]

2024-05-20 Thread via GitHub


showuon commented on PR #15947:
URL: https://github.com/apache/kafka/pull/15947#issuecomment-2121598733

   @satishd , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-20 Thread via GitHub


showuon commented on PR #15951:
URL: https://github.com/apache/kafka/pull/15951#issuecomment-2121598551

   @chia7712 , I've updated the PR. Please take a look again when available. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16666) Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`to tools module

2024-05-20 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848027#comment-17848027
 ] 

黃竣陽 commented on KAFKA-1:
-

I'm interesting in this issue. Please assign to me, Thanks you

> Migrate `TransactionLogMessageFormatter`, GroupMetadataMessageFormatter` and 
> `OffsetsMessageFormatter`to tools module
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> `GroupMetadataMessageFormatter`[0], `OffsetsMessageFormatter`[1], and 
> `TransactionLogMessageFormatter`[2] are used by ConsoleConsumer to parse data 
> of internal topics. Following the migration plan, we should move them to 
> tools-api module. Also, we need to keep the compatibility of command line. 
> That is to say, `ConsoleConsumer` can accept the previous package name and 
> then use the (java) implementation to parse and make same output.
> [0] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
> [1] 
> https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248
> [2] 
> https://github.com/apache/kafka/blob/9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848022#comment-17848022
 ] 

Matthias J. Sax commented on KAFKA-16801:
-

There packages contain code for system tests. We put the code under 
`src/test/java/...`; there is no `src/main/java/...` and the code is not unit 
test code either

What would be the right way to address this?

 

> Streams upgrade :test target doesn't find any junit tests
> -
>
> Key: KAFKA-16801
> URL: https://issues.apache.org/jira/browse/KAFKA-16801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> No test executed. This behavior has been deprecated.    
> This will fail with an error in Gradle 9.0.    
> There are test sources present but no test was executed. Please check your 
> test configuration.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
>     
> 23 usages
>  
> Task::streams:upgrade-system-tests-0100:test    
> Task::streams:upgrade-system-tests-0101:test    
> Task::streams:upgrade-system-tests-0102:test    
> Task::streams:upgrade-system-tests-0110:test    
> Task::streams:upgrade-system-tests-10:test    
> Task::streams:upgrade-system-tests-11:test    
> Task::streams:upgrade-system-tests-20:test    
> Task::streams:upgrade-system-tests-21:test    
> Task::streams:upgrade-system-tests-22:test    
> Task::streams:upgrade-system-tests-23:test    
> Task::streams:upgrade-system-tests-24:test    
> Task::streams:upgrade-system-tests-25:test    
> Task::streams:upgrade-system-tests-26:test    
> Task::streams:upgrade-system-tests-27:test    
> Task::streams:upgrade-system-tests-28:test    
> Task::streams:upgrade-system-tests-30:test    
> Task::streams:upgrade-system-tests-31:test    
> Task::streams:upgrade-system-tests-32:test    
> Task::streams:upgrade-system-tests-33:test    
> Task::streams:upgrade-system-tests-34:test    
> Task::streams:upgrade-system-tests-35:test    
> Task::streams:upgrade-system-tests-36:test    
> Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16794) Can't open videos in streams documentation

2024-05-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16794:

Component/s: streams

> Can't open videos in streams documentation
> --
>
> Key: KAFKA-16794
> URL: https://issues.apache.org/jira/browse/KAFKA-16794
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Kuan Po Tseng
>Priority: Minor
> Attachments: IMG_4445.png, image.png
>
>
> Can't open videos in page [https://kafka.apache.org/documentation/streams/]
> Open console in chrome browser and it shows error message:
> {{Refused to frame 'https://www.youtube.com/' because it violates the 
> following Content Security Policy directive: "frame-src 'self'".}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848021#comment-17848021
 ] 

Matthias J. Sax commented on KAFKA-15143:
-

As pointed out on KAFKA-15242 cf comments, we also need to do something about 
`FixedKeyRecord`, because it does not have (and should not have) a public 
constructor. I'll point this out on the DISCUSS thread of the KIP, too.

> MockFixedKeyProcessorContext is missing from test-utils
> ---
>
> Key: KAFKA-15143
> URL: https://issues.apache.org/jira/browse/KAFKA-15143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 3.5.0
>Reporter: Tomasz Kaszuba
>Assignee: Shashwat Pandey
>Priority: Major
>  Labels: needs-kip
>
> I am trying to test a ContextualFixedKeyProcessor but it is not possible to 
> call the init method from within a unit test since the MockProcessorContext 
> doesn't implement  
> {code:java}
> FixedKeyProcessorContext {code}
> but only
> {code:java}
> ProcessorContext
> {code}
> Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848020#comment-17848020
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

I don't think that `TestRecord` has anything to do with it, because 
`TestRecord` is not used in combination with `MockProcessorContext`, but only 
in combination with the `TopologyTestDriver` (and corresponding 
`TestInputTopic` and `TestOutputTopic`).

I agree though, that we need some more helper class, because `FixedKeyRecord` 
objects cannot be instantiated directly (no public constructor). Thanks for the 
call out – the KIP needs to be extended accordingly – we would have missed 
this...

This ticket did not have this dependency in its description either though. I 
think we can still close it as duplicate, and add anything missing to the other 
ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Prototype: Inherit the maxParallelForks from Jenkins CPU count [kafka]

2024-05-20 Thread via GitHub


gharris1727 opened a new pull request, #16009:
URL: https://github.com/apache/kafka/pull/16009

   The maxParallelForks is usually inherited from the CPU count on developer 
machines, and hardcoded to 2 in Jenkins.
   This is an experiment to try increasing the parallelism in Jenkins.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16197.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect Worker poll timeout prints Consumer poll timeout specific warnings.
> ---
>
> Key: KAFKA-16197
> URL: https://issues.apache.org/jira/browse/KAFKA-16197
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.8.0
>
>
> When a Connect worker's poll timeout expires in Connect, the log lines that 
> we see are:
> {noformat}
> consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records.
> {noformat}
> and the reason for leaving the group is 
> {noformat}
> Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
> timeout has expired.
> {noformat}
> which is specific to Consumers and not to Connect workers. The log line above 
> in specially misleading because the config `max.poll.interval.ms` is not 
> configurable for a Connect worker and could make someone believe that the 
> logs are being written for Sink Connectors and not for Connect worker. 
> Ideally, we should print something specific to Connect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-20 Thread via GitHub


gharris1727 merged PR #15305:
URL: https://github.com/apache/kafka/pull/15305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2121455690

   Test failures appear unrelated, and the tests pass for me locally. LGTM, and 
thanks @vamossagar12 for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15494) Remove deprecated calls in build.gradle for preparing future upgrades

2024-05-20 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848008#comment-17848008
 ] 

Greg Harris commented on KAFKA-15494:
-

[~bmscomp] Do you have any progress here? I subdivided this task into multiple 
parts in KAFKA-16800 so that multiple contributors may make some progress. 
Perhaps you can review those changes, or take one of the tasks for yourself?

> Remove deprecated calls in build.gradle for preparing future upgrades
> -
>
> Key: KAFKA-15494
> URL: https://issues.apache.org/jira/browse/KAFKA-15494
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>
> On the purpose of preparing the future upgrades of {*}Gradle wrapper{*}, we 
> need to get ride of the deprecated  calls of certains methods as tasks 
> registrations, this will make the future upgrades to *Gradle 9* where plenty 
> of methods are marked for removal for this future big release, that will for 
> sure brings support for *JDK 21*
>  
> Running *Gradle* build with *--warning-mode all* reveals much warnings and 
> much deprecations for removal, and we have to be prepared from now to the 
> future changes 
>  
>  
> This is an example of deprecation warning 
>  
> {code:java}
> Build file '/Users/bmscomp/codes/kafka/build.gradle': line 3116
> The org.gradle.api.plugins.Convention type has been deprecated. This is 
> scheduled to be removed in Gradle 9.0. Consult the upgrading guide for 
> further information: 
> https://docs.gradle.org/8.2.1/userguide/upgrading_version_8.html#deprecated_access_to_conventions
>         at 
> build_bpyr04xfzz0tpxxyqu97xn8xy$_run_closure58.doCall(/Users/bmscomp/codes/kafka/build.gradle:3116)
>         (Run with --stacktrace to get the full stack trace of this 
> deprecation warning.)
>         at 
> build_bpyr04xfzz0tpxxyqu97xn8xy.run(/Users/bmscomp/codes/kafka/build.gradle:3115)
>         (Run with --stacktrace to get the full stack trace of this 
> deprecation warning.) {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16801:

Labels: newbie  (was: )

> Streams upgrade :test target doesn't find any junit tests
> -
>
> Key: KAFKA-16801
> URL: https://issues.apache.org/jira/browse/KAFKA-16801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> No test executed. This behavior has been deprecated.    
> This will fail with an error in Gradle 9.0.    
> There are test sources present but no test was executed. Please check your 
> test configuration.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
>     
> 23 usages
>  
> Task::streams:upgrade-system-tests-0100:test    
> Task::streams:upgrade-system-tests-0101:test    
> Task::streams:upgrade-system-tests-0102:test    
> Task::streams:upgrade-system-tests-0110:test    
> Task::streams:upgrade-system-tests-10:test    
> Task::streams:upgrade-system-tests-11:test    
> Task::streams:upgrade-system-tests-20:test    
> Task::streams:upgrade-system-tests-21:test    
> Task::streams:upgrade-system-tests-22:test    
> Task::streams:upgrade-system-tests-23:test    
> Task::streams:upgrade-system-tests-24:test    
> Task::streams:upgrade-system-tests-25:test    
> Task::streams:upgrade-system-tests-26:test    
> Task::streams:upgrade-system-tests-27:test    
> Task::streams:upgrade-system-tests-28:test    
> Task::streams:upgrade-system-tests-30:test    
> Task::streams:upgrade-system-tests-31:test    
> Task::streams:upgrade-system-tests-32:test    
> Task::streams:upgrade-system-tests-33:test    
> Task::streams:upgrade-system-tests-34:test    
> Task::streams:upgrade-system-tests-35:test    
> Task::streams:upgrade-system-tests-36:test    
> Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16805) Stop using a ClosureBackedAction to configure Spotbugs reports

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16805:

Labels: newbie  (was: )

> Stop using a ClosureBackedAction to configure Spotbugs reports
> --
>
> Key: KAFKA-16805
> URL: https://issues.apache.org/jira/browse/KAFKA-16805
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.util.ClosureBackedAction type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_7.html#org_gradle_util_reports_deprecations]
>     
> 1 usage    
> [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L745-L749]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16803:

Labels: newbie  (was: )

> Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil
> 
>
> Key: KAFKA-16803
> URL: https://issues.apache.org/jira/browse/KAFKA-16803
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.util.ConfigureUtil type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations]
> 2 usages    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin    
> Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16806) Explicitly declare JUnit dependencies for all test modules

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16806:
---

 Summary: Explicitly declare JUnit dependencies for all test modules
 Key: KAFKA-16806
 URL: https://issues.apache.org/jira/browse/KAFKA-16806
 Project: Kafka
  Issue Type: Sub-task
Reporter: Greg Harris


The automatic loading of test framework implementation dependencies has been 
deprecated.    
This is scheduled to be removed in Gradle 9.0.    
Declare the desired test framework directly on the test suite or explicitly 
declare the test framework implementation dependencies on the test's runtime 
classpath.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_framework_implementation_dependencies]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16805) Stop using a ClosureBackedAction to configure Spotbugs reports

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16805:
---

 Summary: Stop using a ClosureBackedAction to configure Spotbugs 
reports
 Key: KAFKA-16805
 URL: https://issues.apache.org/jira/browse/KAFKA-16805
 Project: Kafka
  Issue Type: Sub-task
Reporter: Greg Harris


The org.gradle.util.ClosureBackedAction type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_7.html#org_gradle_util_reports_deprecations]
    
1 usage    
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L745-L749]
 



 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16802) Move build.gradle java version information inside of a java block

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16802:

Description: 
The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
  
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]

 

  was:
The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
  
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]

 

The org.gradle.api.plugins.BasePluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation]
1 usage    
Script:build.gradle


> Move build.gradle java version information inside of a java block
> -
>
> Key: KAFKA-16802
> URL: https://issues.apache.org/jira/browse/KAFKA-16802
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
>   
> [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16804) Replace gradle archivesBaseName with archivesName

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16804:

Description: 
The BasePluginExtension.archivesBaseName property has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
Please use the archivesName property instead.    
[Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName]
1 usage    
Script:build.gradle

 

The org.gradle.api.plugins.BasePluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation]
1 usage    
Script:build.gradle

  was:
The BasePluginExtension.archivesBaseName property has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
Please use the archivesName property instead.    
[Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName]
1 usage    
Script:build.gradle


> Replace gradle archivesBaseName with archivesName
> -
>
> Key: KAFKA-16804
> URL: https://issues.apache.org/jira/browse/KAFKA-16804
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The BasePluginExtension.archivesBaseName property has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> Please use the archivesName property instead.    
> [Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName]
> 1 usage    
> Script:build.gradle
>  
> The org.gradle.api.plugins.BasePluginConvention type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation]
> 1 usage    
> Script:build.gradle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16802) Move build.gradle java version information inside of a java block

2024-05-20 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16802:

Description: 
The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
  
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]

 

The org.gradle.api.plugins.BasePluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation]
1 usage    
Script:build.gradle

  was:
The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
  
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]


> Move build.gradle java version information inside of a java block
> -
>
> Key: KAFKA-16802
> URL: https://issues.apache.org/jira/browse/KAFKA-16802
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
>   
> [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]
>  
> The org.gradle.api.plugins.BasePluginConvention type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#base_convention_deprecation]
> 1 usage    
> Script:build.gradle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-20 Thread via GitHub


ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1607404895


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1974,6 +1974,15 @@ public Set 
nonSourceChangelogTopics() {
 return topicConfigs;
 }
 
+/**
+ *
+ * @return the set of changelog topics, which includes both source 
changelog topics and non
+ * source changelog topics.
+ */
+public Set changelogTopics() {
+return Collections.unmodifiableSet(new 
HashSet<>(stateChangelogTopics.keySet()));

Review Comment:
   I think you can skip the new HashSet step, that's pretty much redundant with 
the unmodifiableSet and since we don't plan on modifying the returned set, it's 
better to just wrap the keySet directly to save a bunch of unnecessary copying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16804) Replace gradle archivesBaseName with archivesName

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16804:
---

 Summary: Replace gradle archivesBaseName with archivesName
 Key: KAFKA-16804
 URL: https://issues.apache.org/jira/browse/KAFKA-16804
 Project: Kafka
  Issue Type: Sub-task
Reporter: Greg Harris


The BasePluginExtension.archivesBaseName property has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
Please use the archivesName property instead.    
[Documentation|https://docs.gradle.org/8.7/dsl/org.gradle.api.plugins.BasePluginExtension.html#org.gradle.api.plugins.BasePluginExtension:archivesName]
1 usage    
Script:build.gradle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16803) Upgrade to a version of ShadowJavaPlugin which doesn't use ConfigureUtil

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16803:
---

 Summary: Upgrade to a version of ShadowJavaPlugin which doesn't 
use ConfigureUtil
 Key: KAFKA-16803
 URL: https://issues.apache.org/jira/browse/KAFKA-16803
 Project: Kafka
  Issue Type: Sub-task
Reporter: Greg Harris


The org.gradle.util.ConfigureUtil type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#org_gradle_util_reports_deprecations]
2 usages    
Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin    
Plugin:com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16802) Move build.gradle java version information inside of a java block

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16802:
---

 Summary: Move build.gradle java version information inside of a 
java block
 Key: KAFKA-16802
 URL: https://issues.apache.org/jira/browse/KAFKA-16802
 Project: Kafka
  Issue Type: Sub-task
Reporter: Greg Harris


The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
This is scheduled to be removed in Gradle 9.0.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
  
[https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2121393282

   I would recommend taking a look at where we are passing the topic ID through 
and the checks we do. If we think it is useful to ensure we are writing to the 
right topic, we should do it, but if it is just adding complexity, we may want 
to consider changing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16799:
--
Labels: consumer-threading-refactor  (was: )

> NetworkClientDelegate is not backing off if the node is not found
> -
>
> Key: KAFKA-16799
> URL: https://issues.apache.org/jira/browse/KAFKA-16799
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> When performing stress testing, I found that AsycnKafkaConsumer's network 
> client delegate isn't backing off if the node is not ready, causing a large 
> number of: 
> {code:java}
>  358 [2024-05-20 22:59:02,591] DEBUG [Consumer 
> clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, 
> groupId=consumer-groups-test-5] Node is not ready, handle the request in the 
> next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd     
> ev.cloud:9092 (id: 2147483643 rack: null), 
> request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5',
>  memberId='', memberEpoch=0, instanceId=null, rackId=null, 
> rebalanceTimeoutMs=10, subscri     
> bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, 
> topicPartitions=[]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761,
>  node=Optional[b4-pkc-devcmkz697.us-west-2.aws     .devel.cpdev.cloud:9092 
> (id: 2147483643 rack: null)], 
> timer=org.apache.kafka.common.utils.Timer@649fffad} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code}
> show up in the log.
> What should have happened is: 1. node is not ready 2. exponential back off 3. 
> retry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16799:
--
Component/s: clients

> NetworkClientDelegate is not backing off if the node is not found
> -
>
> Key: KAFKA-16799
> URL: https://issues.apache.org/jira/browse/KAFKA-16799
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> When performing stress testing, I found that AsycnKafkaConsumer's network 
> client delegate isn't backing off if the node is not ready, causing a large 
> number of: 
> {code:java}
>  358 [2024-05-20 22:59:02,591] DEBUG [Consumer 
> clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, 
> groupId=consumer-groups-test-5] Node is not ready, handle the request in the 
> next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd     
> ev.cloud:9092 (id: 2147483643 rack: null), 
> request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5',
>  memberId='', memberEpoch=0, instanceId=null, rackId=null, 
> rebalanceTimeoutMs=10, subscri     
> bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, 
> topicPartitions=[]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761,
>  node=Optional[b4-pkc-devcmkz697.us-west-2.aws     .devel.cpdev.cloud:9092 
> (id: 2147483643 rack: null)], 
> timer=org.apache.kafka.common.utils.Timer@649fffad} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code}
> show up in the log.
> What should have happened is: 1. node is not ready 2. exponential back off 3. 
> retry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16801:
---

 Summary: Streams upgrade :test target doesn't find any junit tests
 Key: KAFKA-16801
 URL: https://issues.apache.org/jira/browse/KAFKA-16801
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Greg Harris


No test executed. This behavior has been deprecated.    
This will fail with an error in Gradle 9.0.    
There are test sources present but no test was executed. Please check your test 
configuration.    
[Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
    
23 usages

 
Task::streams:upgrade-system-tests-0100:test    
Task::streams:upgrade-system-tests-0101:test    
Task::streams:upgrade-system-tests-0102:test    
Task::streams:upgrade-system-tests-0110:test    
Task::streams:upgrade-system-tests-10:test    
Task::streams:upgrade-system-tests-11:test    
Task::streams:upgrade-system-tests-20:test    
Task::streams:upgrade-system-tests-21:test    
Task::streams:upgrade-system-tests-22:test    
Task::streams:upgrade-system-tests-23:test    
Task::streams:upgrade-system-tests-24:test    
Task::streams:upgrade-system-tests-25:test    
Task::streams:upgrade-system-tests-26:test    
Task::streams:upgrade-system-tests-27:test    
Task::streams:upgrade-system-tests-28:test    
Task::streams:upgrade-system-tests-30:test    
Task::streams:upgrade-system-tests-31:test    
Task::streams:upgrade-system-tests-32:test    
Task::streams:upgrade-system-tests-33:test    
Task::streams:upgrade-system-tests-34:test    
Task::streams:upgrade-system-tests-35:test    
Task::streams:upgrade-system-tests-36:test    
Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-20 Thread via GitHub


cmccabe opened a new pull request, #16008:
URL: https://github.com/apache/kafka/pull/16008

   Fix the code in the RaftControllerNodeProvider to query RaftManager to find 
Node information, rather than consulting a static map.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16800) Resolve Gradle 9.0 deprecations

2024-05-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16800:
---

 Summary: Resolve Gradle 9.0 deprecations
 Key: KAFKA-16800
 URL: https://issues.apache.org/jira/browse/KAFKA-16800
 Project: Kafka
  Issue Type: Task
Reporter: Greg Harris


Gradle prints the following warning in our build:
{noformat}
Deprecated Gradle features were used in this build, making it incompatible with 
Gradle 9.0.{noformat}
We should try to resolve these build warnings to prepare for the future release 
of Gradle 9.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found

2024-05-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16799:
--

 Summary: NetworkClientDelegate is not backing off if the node is 
not found
 Key: KAFKA-16799
 URL: https://issues.apache.org/jira/browse/KAFKA-16799
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


When performing stress testing, I found that AsycnKafkaConsumer's network 
client delegate isn't backing off if the node is not ready, causing a large 
number of: 
{code:java}
 358 [2024-05-20 22:59:02,591] DEBUG [Consumer 
clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, 
groupId=consumer-groups-test-5] Node is not ready, handle the request in the 
next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd     
ev.cloud:9092 (id: 2147483643 rack: null), 
request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5',
 memberId='', memberEpoch=0, instanceId=null, rackId=null, 
rebalanceTimeoutMs=10, subscri     
bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, 
topicPartitions=[]), 
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761,
 node=Optional[b4-pkc-devcmkz697.us-west-2.aws     .devel.cpdev.cloud:9092 (id: 
2147483643 rack: null)], timer=org.apache.kafka.common.utils.Timer@649fffad} 
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code}
show up in the log.

What should have happened is: 1. node is not ready 2. exponential back off 3. 
retry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607383820


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig,
*/
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
-   entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+   entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],

Review Comment:
   is there a reason to pass this data structure here if we are not using the 
ID to check the append at the log level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607379861


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -604,40 +604,53 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
+val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, 
ProduceRequestData.PartitionProduceData)]
+
+produceRequest.data.topicData.forEach { topic =>
+  topic.partitionData.forEach { partition =>
+val topicIdIsMissing = topic.topicId == null || topic.topicId == 
Uuid.ZERO_UUID

Review Comment:
   should we be deciding which fields to grab from based on the request 
version? And do we ever expect a null topic id? I wouldn't think so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607379861


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -604,40 +604,53 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
+val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, 
ProduceRequestData.PartitionProduceData)]
+
+produceRequest.data.topicData.forEach { topic =>
+  topic.partitionData.forEach { partition =>
+val topicIdIsMissing = topic.topicId == null || topic.topicId == 
Uuid.ZERO_UUID

Review Comment:
   should we be deciding which fields to grab from based on the request version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bump requests from 2.24.0 to 2.32.0 in /tests [kafka]

2024-05-20 Thread via GitHub


dependabot[bot] opened a new pull request, #16007:
URL: https://github.com/apache/kafka/pull/16007

   Bumps [requests](https://github.com/psf/requests) from 2.24.0 to 2.32.0.
   
   Release notes
   Sourced from https://github.com/psf/requests/releases;>requests's 
releases.
   
   v2.32.0
   2.32.0 (2024-05-20)
    PYCON US 2024 EDITION 
   Security
   
   Fixed an issue where setting verify=False on the first 
request from a
   Session will cause subsequent requests to the same origin to also 
ignore
   cert verification, regardless of the value of verify.
   (https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56;>https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56)
   
   Improvements
   
   verify=True now reuses a global SSLContext which should 
improve
   request time variance between first and subsequent requests. It should
   also minimize certificate load time on Windows systems when using a Python
   version built with OpenSSL 3.x. (https://redirect.github.com/psf/requests/issues/6667;>#6667)
   Requests now supports optional use of character detection
   (chardet or charset_normalizer) when repackaged or 
vendored.
   This enables pip and other projects to minimize their vendoring
   surface area. The Response.text() and 
apparent_encoding APIs
   will default to utf-8 if neither library is present. (https://redirect.github.com/psf/requests/issues/6702;>#6702)
   
   Bugfixes
   
   Fixed bug in length detection where emoji length was incorrectly
   calculated in the request content-length. (https://redirect.github.com/psf/requests/issues/6589;>#6589)
   Fixed deserialization bug in JSONDecodeError. (https://redirect.github.com/psf/requests/issues/6629;>#6629)
   Fixed bug where an extra leading / (path separator) could 
lead
   urllib3 to unnecessarily reparse the request URI. (https://redirect.github.com/psf/requests/issues/6644;>#6644)
   
   Deprecations
   
   Requests has officially added support for CPython 3.12 (https://redirect.github.com/psf/requests/issues/6503;>#6503)
   Requests has officially added support for PyPy 3.9 and 3.10 (https://redirect.github.com/psf/requests/issues/6641;>#6641)
   Requests has officially dropped support for CPython 3.7 (https://redirect.github.com/psf/requests/issues/6642;>#6642)
   Requests has officially dropped support for PyPy 3.7 and 3.8 (https://redirect.github.com/psf/requests/issues/6641;>#6641)
   
   Documentation
   
   Various typo fixes and doc improvements.
   
   Packaging
   
   Requests has started adopting some modern packaging practices.
   The source files for the projects (formerly requests) is now 
located
   in src/requests in the Requests sdist. (https://redirect.github.com/psf/requests/issues/6506;>#6506)
   Starting in Requests 2.33.0, Requests will migrate to a PEP 517 build 
system
   using hatchling. This should not impact the average user, but 
extremely old
   versions of packaging utilities may have issues with the new packaging 
format.
   
   New Contributors
   
   https://github.com/matthewarmand;>@​matthewarmand made 
their first contribution in https://redirect.github.com/psf/requests/pull/6258;>psf/requests#6258
   https://github.com/cpzt;>@​cpzt made their 
first contribution in https://redirect.github.com/psf/requests/pull/6456;>psf/requests#6456
   
   
   
   ... (truncated)
   
   
   Changelog
   Sourced from https://github.com/psf/requests/blob/main/HISTORY.md;>requests's 
changelog.
   
   2.32.0 (2024-05-20)
   Security
   
   Fixed an issue where setting verify=False on the first 
request from a
   Session will cause subsequent requests to the same origin to also 
ignore
   cert verification, regardless of the value of verify.
   (https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56;>https://github.com/psf/requests/security/advisories/GHSA-9wx4-h78v-vm56)
   
   Improvements
   
   verify=True now reuses a global SSLContext which should 
improve
   request time variance between first and subsequent requests. It should
   also minimize certificate load time on Windows systems when using a Python
   version built with OpenSSL 3.x. (https://redirect.github.com/psf/requests/issues/6667;>#6667)
   Requests now supports optional use of character detection
   (chardet or charset_normalizer) when repackaged or 
vendored.
   This enables pip and other projects to minimize their vendoring
   surface area. The Response.text() and 
apparent_encoding APIs
   will default to utf-8 if neither library is present. (https://redirect.github.com/psf/requests/issues/6702;>#6702)
   
   Bugfixes
   
   Fixed bug in length detection where emoji length was incorrectly
   calculated in the request content-length. (https://redirect.github.com/psf/requests/issues/6589;>#6589)
   Fixed deserialization bug in JSONDecodeError. (https://redirect.github.com/psf/requests/issues/6629;>#6629)
   Fixed bug where an extra leading / (path separator) could 
lead
   urllib3 to 

[jira] [Assigned] (KAFKA-16515) Fix the ZK Metadata cache use of voter static configuration

2024-05-20 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-16515:


Assignee: Colin McCabe  (was: José Armando García Sancio)

> Fix the ZK Metadata cache use of voter static configuration
> ---
>
> Key: KAFKA-16515
> URL: https://issues.apache.org/jira/browse/KAFKA-16515
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.8.0
>
>
> Looks like because of ZK migration to KRaft the ZK Metadata cache was changed 
> to read the voter static configuration. This needs to change to use the voter 
> nodes reported by  the raft manager or the kraft client.
> The injection code is in KafkaServer where it constructs 
> MetadataCache.zkMetadata.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers [kafka]

2024-05-20 Thread via GitHub


cmccabe opened a new pull request, #16006:
URL: https://github.com/apache/kafka/pull/16006

   ZkMetadataCache could theoretically return KRaft controller information from 
a call to ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft 
controllers are not part of the set of brokers.
   
   In practice, this wasn't a concern since all the use-cases for 
ZkMetadataCache.getAliveBrokerNode center around finding coordinators, which 
will never be kraft controllers anyway. Still, cleaning up this code reduces 
confusion and is helpful for removing places where static controller 
configurations are used, as part of KIP-853.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607367455


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -891,11 +897,15 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
 // which is supporting the new magic version to one which doesn't, 
then we will need to convert.
 if (!records.hasMatchingMagic(minUsedMagic))
 records = batch.records().downConvert(minUsedMagic, 0, 
time).records();
-ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
-if (tpData == null) {
-tpData = new 
ProduceRequestData.TopicProduceData().setName(tp.topic());
-tpd.add(tpData);
+Optional topicProduceData = 
canUseTopicId ?
+Optional.ofNullable(tpd.find(tp.topic(), 
topicIds.get(tp.topic( :
+tpd.stream().filter(data -> 
data.name().equals(tp.topic())).findFirst();
+
+ProduceRequestData.TopicProduceData tpData = 
topicProduceData.orElse(new 
ProduceRequestData.TopicProduceData().setName(tp.topic()));
+if (canUseTopicId) {
+tpData.setTopicId(topicIds.get(tp.topic()));
 }
+tpd.add(tpData);

Review Comment:
   Hmm -- we don't need to add if we already added this right? I also wonder if 
we can cache by topic name so we don't have to findFirst



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607363954


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+// Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   What do we do if metadata has refreshed and is no longer in the metadata? 
   For fetch it is a bit different since we have the session logic, and can 
handle missing topics.
   
   I would recommend writing through a few cases where the server and client 
have/don't have the topic ID to reason about the upgrade case/downgrade 
case/deletions/reassignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-05-20 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez resolved KAFKA-16645.
-
Resolution: Resolved

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: Igor Soarez
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-05-20 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reopened KAFKA-16645:
-

Need to re-open to change the resolution, release_notes.py doesn't like the one 
I picked

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: Igor Soarez
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-20 Thread via GitHub


jolshan commented on PR #15968:
URL: https://github.com/apache/kafka/pull/15968#issuecomment-2121321915

   > Topic name and Topic Id will become optional following the footstep of 
FetchRequest/FetchResponse
   My understanding is that all requests going forward will use ID and not name 
similar to fetch request. I believe that is what is in the PR, but the comment 
suggests otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add demo template for transactional client [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1607348108


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;

Review Comment:
   nit: whenever we want to merge this, we can't use wildcard imports. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add demo template for transactional client [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1607347641


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+/**
+ * This class demonstrates a transactional Kafka client application that 
consumes messages from an input topic,
+ * processes them to generate word count statistics, and produces the results 
to an output topic.
+ * It utilizes Kafka's transactional capabilities to ensure exactly-once 
processing semantics.
+ *
+ * The application continuously polls for records from the input topic, 
processes them, and commits the offsets
+ * in a transactional manner. In case of exceptions or errors, it handles them 
appropriately, either aborting the
+ * transaction and resetting to the last committed positions, or restarting 
the application.
+ *
+ */
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {
+try {
+try {
+// Poll records from Kafka for a timeout of 60 seconds
+ConsumerRecords records = 
consumer.poll(ofSeconds(60));
+
+// Process records to generate word count map
+Map wordCountMap = new HashMap<>();
+
+for (ConsumerRecord record : records) {
+String[] words = record.value().split(" ");
+for (String word : words) {
+wordCountMap.merge(word, 1, Integer::sum);
+}
+}
+
+// Begin transaction
+producer.beginTransaction();
+
+// Produce word count results to output topic
+wordCountMap.forEach((key, value) ->
+producer.send(new ProducerRecord<>(OUTPUT_TOPIC, 
key, value.toString(;
+
+// Determine offsets to commit
+Map offsetsToCommit = 
new HashMap<>();
+for (TopicPartition partition : records.partitions()) {
+List> 
partitionedRecords = records.records(partition);
+long offset = 
partitionedRecords.get(partitionedRecords.size() - 1).offset();
+offsetsToCommit.put(partition, new 
OffsetAndMetadata(offset + 1));
+}
+
+// Send offsets to transaction for atomic commit
+producer.sendOffsetsToTransaction(offsetsToCommit, 
CONSUMER_GROUP_ID);
+
+// Commit transaction
+producer.commitTransaction();
+} catch (AbortableTransactionException e) {
+// Abortable Exception: Handle Kafka exception by aborting 
transaction. AbortTransaction path never throws abortable exception.

Review Comment:
   More of a implementation discussion, but are we saying that 
producer.abortTransaction() should never throw such an exception? Or that we 
don't ever try to catch such an exception from abortTransaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add demo template for transactional client [kafka]

2024-05-20 Thread via GitHub


jolshan commented on code in PR #15913:
URL: https://github.com/apache/kafka/pull/15913#discussion_r1607340714


##
examples/src/main/java/kafka/examples/TransactionalClientDemo.java:
##
@@ -0,0 +1,153 @@
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Collections.singleton;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+/**
+ * This class demonstrates a transactional Kafka client application that 
consumes messages from an input topic,
+ * processes them to generate word count statistics, and produces the results 
to an output topic.
+ * It utilizes Kafka's transactional capabilities to ensure exactly-once 
processing semantics.
+ *
+ * The application continuously polls for records from the input topic, 
processes them, and commits the offsets
+ * in a transactional manner. In case of exceptions or errors, it handles them 
appropriately, either aborting the
+ * transaction and resetting to the last committed positions, or restarting 
the application.
+ *
+ */
+public class TransactionalClientDemo {
+
+private static final String CONSUMER_GROUP_ID = "my-group-id";
+private static final String OUTPUT_TOPIC = "output";
+private static final String INPUT_TOPIC = "input";
+private static KafkaConsumer consumer;
+private static KafkaProducer producer;
+
+public static void main(String[] args) {
+initializeApplication();
+
+boolean isRunning = true;
+// Continuously poll for records
+while(isRunning) {

Review Comment:
   nit: `while (isRunning)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-20 Thread Nicholas Feinberg (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847997#comment-17847997
 ] 

Nicholas Feinberg commented on KAFKA-16779:
---

When we explicitly set topics' retention to 4d (34560ms), our brokers 
immediately expired the surprisingly old logs.

I've confirmed that the same setting is present in the brokers' 
`server.properties` file - that is, they have `log.retention.hours=96`. I've 
also checked and confirmed that topics do not have an explicitly set retention 
that would override this.

> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> In a Kafka cluster with all topics set to four days of retention or longer 
> (34560ms), most brokers seem to be retaining six days of data.
> This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
> and thus are regularly rolling new log segments. We observe this unexpectedly 
> high retention both via disk usage statistics and by requesting the oldest 
> available messages from Kafka.
> Some of these brokers crashed with an 'mmap failed' error (attached). When 
> those brokers started up again, they returned to the expected four days of 
> retention.
> Manually restarting brokers also seems to cause them to return to four days 
> of retention. Demoting and promoting brokers only has this effect on a small 
> part of the data hosted on a broker.
> These hosts had ~170GiB of free memory available. We saw no signs of pressure 
> on either system or JVM heap memory before or after they reported this error. 
> Committed memory seems to be around 10%, so this doesn't seem to be an 
> overcommit issue.
> This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). 
> Prior to the upgrade, it was running on Kafka 2.4.
> We last reduced retention for ops on May 7th, after which we restored 
> retention to our default of four days. This was the second time we've 
> temporarily reduced and restored retention since the upgrade. This problem 
> did not manifest the previous time we did so, nor did it manifest on our 
> other Kafka 3.7 clusters.
> We are running on AWS 
> [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
> have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
> (i.e. unraided).
> Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
> we're still using Zookeeper.
> Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
> hosts. Please let me know if I can provide any further information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-20 Thread via GitHub


cmccabe commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1607334284


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {

Review Comment:
   If a metadata version change happens, we may be in a situation where we need 
to re-register. That's what this clause reflects.
   
   I agree it would be good to have a test that hits this specifically. Looks 
like your test below will do so -- I have added it.



##
metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import 

Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-20 Thread via GitHub


cmccabe commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1607334284


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {

Review Comment:
   If a metadata version change happens, we may be in a situation where we need 
to re-register. That's what this clause reflects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-20 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1607136651


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -420,12 +420,11 @@ public CompletableFuture heartbeat(
 );
 }
 
-// Using a read operation is okay here as we ignore the last committed 
offset in the snapshot registry.
-// This means we will read whatever is in the latest snapshot, which 
is how the old coordinator behaves.
-return runtime.scheduleReadOperation(
+return runtime.scheduleWriteOperation(
 "classic-group-heartbeat",
 topicPartitionFor(request.groupId()),
-(coordinator, __) -> coordinator.classicGroupHeartbeat(context, 
request)
+Duration.ofMillis(config.offsetCommitTimeoutMs),

Review Comment:
   not necessarily a comment for this PR but i wonder if we should change the 
name of this config since it's being used for all writes.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4209,31 +4241,67 @@ private void removePendingSyncMember(
  * @param contextThe request context.
  * @param requestThe actual Heartbeat request.
  *
- * @return The Heartbeat response.
+ * @return The coordinator result that contains the heartbeat response.
  */
-public HeartbeatResponseData classicGroupHeartbeat(
+public CoordinatorResult 
classicGroupHeartbeat(

Review Comment:
   maybe i'm missing something but i don't see where we actually initialize 
CoordinatorResult with records to write to the log



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4342,77 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.hasUnreleasedPartitions(member))) {
+error = Errors.REBALANCE_IN_PROGRESS;
+scheduleConsumerGroupJoinTimeout(groupId, memberId, 
member.rebalanceTimeoutMs());

Review Comment:
   we are saying that we cancel the join timeout when we first convert to 
consumer group, then when we have a group epoch bump we tell the classic group 
member we're rebalancing and they should send a join request. is my 
understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16790) Calls to RemoteLogManager are made before it is configured

2024-05-20 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847989#comment-17847989
 ] 

Muralidhar Basani commented on KAFKA-16790:
---

[~christo_lolov] have a pr open. 

> Calls to RemoteLogManager are made before it is configured
> --
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.8.0
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=30
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set partitionsBecomeLeader,
>                                     Set partitionsBecomeFollower,
>                                     Map topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++ 

Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]

2024-05-20 Thread via GitHub


muralibasani commented on PR #16005:
URL: https://github.com/apache/kafka/pull/16005#issuecomment-2121170470

   @clolov would you like to take a look ? I have taken the first approach of 
configuring rlm before initializing publishers, instead of skipping if they 
don't exist. There can be multiple places where rlm configuration is required. 
So thought this is better.
   
   Haven't added any explicit tests here, but do you suggest any ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]

2024-05-20 Thread via GitHub


muralibasani opened a new pull request, #16005:
URL: https://github.com/apache/kafka/pull/16005

   Details of the issue is defined in 
https://issues.apache.org/jira/browse/KAFKA-16790
   
   In BrokerServer.scala, brokerMetadataPublishers are configured and when 
there are metadata updates remoteLogManager is not configured by then. 
   Ex  : remoteLogManager.foreach(rlm => 
rlm.onLeadershipChange(partitionsBecomeLeader.asJava, 
partitionsBecomeFollower.asJava, topicIds)) in ReplicaManager is invoked after 
publishers are instantiated, and here rlm has relevant managers configured.
   
   This change makes sure rlm is configured before the brokerMetadataPublishers 
initialization.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16692: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]

2024-05-20 Thread via GitHub


soarez commented on PR #15971:
URL: https://github.com/apache/kafka/pull/15971#issuecomment-2121128256

   > Not sure if we want to amend the commit name to fix it.
   
   No, that could be very disruptive, not worth the trouble. It wasn't hard to 
find the JIRA despite the typo.
   
   > I've picked to 3.6 and 3.7.
   
   Nice. Thanks for closing the JIRA too.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ensure KafkaServerTestHarness::tearDown is always invoked [kafka]

2024-05-20 Thread via GitHub


chia7712 merged PR #15996:
URL: https://github.com/apache/kafka/pull/15996


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-05-20 Thread via GitHub


chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1607152637


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -112,12 +115,15 @@ public void start(Map props) {
 
 @Override
 public void commit() {
-// Offset syncs which were not emitted immediately due to their offset 
spacing should be sent periodically
-// This ensures that low-volume topics aren't left with persistent lag 
at the end of the topic
-promoteDelayedOffsetSyncs();
-// Publish any offset syncs that we've queued up, but have not yet 
been able to publish
-// (likely because we previously reached our limit for number of 
outstanding syncs)
-firePendingOffsetSyncs();
+// Handle delayed and pending offset syncs only when 
emit.offset-syncs.enabled set to true
+if (emitOffsetSyncEnabled) {
+// Offset syncs which were not emitted immediately due to their 
offset spacing should be sent periodically
+// This ensures that low-volume topics aren't left with persistent 
lag at the end of the topic
+promoteDelayedOffsetSyncs();
+// Publish any offset syncs that we've queued up, but have not yet 
been able to publish
+// (likely because we previously reached our limit for number of 
outstanding syncs)
+firePendingOffsetSyncs();

Review Comment:
   We don't need to create `offsetProducer` if `emitOffsetSyncEnabled` is false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1607071132


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -1039,21 +1039,42 @@ public static List> 
reverseTransform(String connName,
 return result;
 }
 
-public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List> taskProps) {
+public boolean taskConfigsChanged(
+ClusterConfigState configState,
+String connName,
+List> taskProps,
+int connectorConfigHash
+) {
 int currentNumTasks = configState.taskCount(connName);
 boolean result = false;
 if (taskProps.size() != currentNumTasks) {
 log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
 result = true;
 } else {
-for (int index = 0; index < currentNumTasks; index++) {
+for (int index = 0; index < currentNumTasks && !result; index++) {

Review Comment:
   this has the effect of hiding the later debug logs for other tasks, is that 
intentional? I don't know if anyone is relying on that information, but this is 
taking away debug information that might be useful.



##
checkstyle/import-control.xml:
##
@@ -574,6 +574,7 @@
   
   
   
+  

Review Comment:
   nit: duplicate



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -1039,21 +1039,42 @@ public static List> 
reverseTransform(String connName,
 return result;
 }
 
-public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List> taskProps) {
+public boolean taskConfigsChanged(
+ClusterConfigState configState,
+String connName,
+List> taskProps,
+int connectorConfigHash
+) {
 int currentNumTasks = configState.taskCount(connName);
 boolean result = false;
 if (taskProps.size() != currentNumTasks) {
 log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
 result = true;
 } else {
-for (int index = 0; index < currentNumTasks; index++) {
+for (int index = 0; index < currentNumTasks && !result; index++) {
 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
 log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
 result = true;
 }
 }
+// Do a final check to see if runtime-controlled properties that 
affect tasks but may
+// not be included in the connector-generated configs for them 
(such as converter overrides)
+// have changed
+if (!result) {

Review Comment:
   None of this looks expensive to compute, WDYT about moving this outside the 
`else` branch and always comparing the hash?



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##
@@ -243,4 +249,29 @@ public static Map patchConfig(
 });
 return result;
 }
+
+/**
+ * Generate a deterministic hash of the supplied config. For configurations
+ * with identical key-value pairs, this hash will always be the same.
+ * @param config the config to hash; may be null
+ * @return a hash of the config
+ */
+public static int configHash(Map config) {
+if (config == null)
+return 0;
+
+Map toHash = new TreeMap<>(config);
+
+byte[] serialized;
+try {
+serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);

Review Comment:
   AbstractMap (superclass of TreeMap) has a hashCode implementation which 
depends on the keys and values in the map. Did you consider using that method, 
and reject it? It looks like it could have fewer memory allocations and copying.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##
@@ -187,6 +193,17 @@ public Map rawTaskConfig(ConnectorTaskId 
task) {
 return taskConfigs.get(task);
 }
 
+/**
+ * Get the hash of the connector config that was used to generate the
+ * latest set of task configs for the connector
+ * @param connectorName name of the connector
+ * @return the config hash, or null if the connector does not exist or
+ * no config hash for its latest set of tasks has been stored
+ */
+public Integer taskConfigHash(String connectorName) {

Review Comment:
   I think this should be called connectorConfigHash, since it's not specific 
to any task.



##

Re: [PR] KAFKA-16692: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka [kafka]

2024-05-20 Thread via GitHub


jolshan commented on PR #15971:
URL: https://github.com/apache/kafka/pull/15971#issuecomment-2120961394

   Yes. I just noticed this too 臘‍♀️ Not sure if we want to amend the commit 
name to fix it.
   
   I've picked to 3.6 and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-20 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-16692.

Fix Version/s: 3.6.3
   Resolution: Fixed

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.7.1, 3.6.3, 3.8
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian 

Re: [PR] MINOR: remove extra import from transactions tests [kafka]

2024-05-20 Thread via GitHub


jolshan merged PR #16000:
URL: https://github.com/apache/kafka/pull/16000


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-20 Thread via GitHub


jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1607031651


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##
@@ -33,13 +35,21 @@ public class AssignmentSpec {
  */
 private final SubscriptionType subscriptionType;
 
-public AssignmentSpec(
+/**
+ * Reverse lookup map representing partitions per topic and
+ * their member assignment.
+ */
+Map> partitionAssignments;

Review Comment:
   should this also be private final, and should the user be able to modify the 
contents of the map?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -516,16 +522,78 @@ public Assignment targetAssignment(String memberId) {
 return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
 }
 
+/**
+ * @return An immutable map containing all the topic partitions
+ * with their current member assignments.
+ */
+public Map> partitionAssignments() {
+return Collections.unmodifiableMap(partitionAssignments);
+}
+
 /**
  * Updates target assignment of a member.
  *
  * @param memberId  The member id.
  * @param newTargetAssignment   The new target assignment.
  */
 public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+updatePartitionAssignments(
+memberId,
+targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+newTargetAssignment
+);
 targetAssignment.put(memberId, newTargetAssignment);
 }
 
+/**
+ * Updates partition assignments of the topics.
+ *
+ * @param memberId  The member Id.
+ * @param oldTargetAssignment   The old target assignment.
+ * @param newTargetAssignment   The new target assignment.
+ *
+ * Package private for testing.
+ */
+void updatePartitionAssignments(
+String memberId,
+Assignment oldTargetAssignment,
+Assignment newTargetAssignment
+) {
+// Combine keys from both old and new assignments.
+Set allTopicIds = new HashSet<>();
+allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+for (Uuid topicId : allTopicIds) {
+Set oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+Set newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+TimelineHashMap topicPartitionAssignment = 
partitionAssignments.computeIfAbsent(
+topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+);
+
+// Remove partitions that aren't present in the new assignment.
+for (Integer partition : oldPartitions) {
+if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {

Review Comment:
   on `memberId.equals(topicPartitionAssignment.get(partition))`: will this 
ever be false? it seems we would always assign a partition to a member only 
when it's been removed by the previous member, right?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-05-20 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847953#comment-17847953
 ] 

Kirk True commented on KAFKA-16272:
---

[~sagarrao]—both of the linked PRs are merged. Can this be marked as Resolved? 
Thanks!

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KIP 989 b [kafka]

2024-05-20 Thread via GitHub


nicktelford closed pull request #16003: KIP 989 b
URL: https://github.com/apache/kafka/pull/16003


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 7) Simplify requirements for rack aware graphs [kafka]

2024-05-20 Thread via GitHub


apourchet opened a new pull request, #16004:
URL: https://github.com/apache/kafka/pull/16004

   Rack aware graphs don't actually need any topology information about the 
system, but rather require a simple ordered (not sorted) grouping of tasks.
   
   This PR changes the internal constructors and some interface signatures of 
RackAwareGraphConstructor and its implementations to allow reuse by future 
components that may not have access to the actual subtopology information.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KIP 989 b [kafka]

2024-05-20 Thread via GitHub


nicktelford opened a new pull request, #16003:
URL: https://github.com/apache/kafka/pull/16003

   - **KAFKA-15541: Add num-open-iterators metric**
   - **Add missing try-finally block**
   - **KAFKA-15541: Add iterator-duration metrics**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on PR #15994:
URL: https://github.com/apache/kafka/pull/15994#issuecomment-2120906740

   Hi @gaurav-narula good to hear that this isn't necessary then. You are free 
to add these debug logs to your own local copy to aid in debugging, and if you 
find particular log statements that are very helpful or outright necessary to 
debug a problem, you can raise a PR to upstream them. If you're having 
difficulty debugging something in a unit/integration test, than a user could 
have the same problem and benefit from those log messages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1605384701


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group.
+ *
+ * The Kafka log is closed after the initial load and only the in memory map is
+ * used after start.
+ */
+class CheckpointsStore implements AutoCloseable {

Review Comment:
   optional nit: This class can be public, along with the methods that are 
intended to be used by MirrorCheckpointTask, because this isn't a 
publically-documented package (like clients, or connect-api, etc.)
   
   
   Outside of those publically-documented packages, the general practice is 
public for external callers, even if the current callers are in the same 
package. We only use package-local for things that would be protected/private, 
but need to be accessed in tests (and so come with the visibility comment.)



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java:
##
@@ -271,4 +284,102 @@ private Map 
assertCheckpointForTopic(
 assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + 
(truth ? "" : " not") + " emit offset sync");
 return checkpoints;
 }
+
+@Test
+public void testCheckpointsTaskRestartUsesExistingCheckpoints() {

Review Comment:
   I think using "real checkpoints" generated by the first MirrorCheckpointTask 
to test the second MirrorCheckpointTask is not necessary, and you can use 
simulated checkpoints instead.
   
   Reassigning variables and copy-pasting sections in tests is typo-prone and I 
think we can avoid it here.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -155,6 +154,58 @@ public void testPastOffsetTranslation() {
 }
 }
 
+// this test has been wriiten knowing the exact offsets syncs stored
+@Test
+public void testPastOffsetTranslationWithoutInitializationReadToEnd() {
+final int maxOffsetLag = 10;
+
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
+@Override
+void backingStoreStart() {
+for (int offset = 0; offset <= 1000; offset += maxOffsetLag) {
+sync(tp, offset, offset);
+assertSparseSyncInvariant(this, tp);
+}
+}
+};
+
+store.start(false);
+
+// After starting but before seeing new offsets
+assertTranslationsNearby(store, 400, 480, 0);
+assertTranslationsNearby(store, 500, 720, 480);
+assertTranslationsNearby(store, 1000, 1000, 990);
+
+for (int offset = 1000; offset <= 1; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+
+// After seeing new offsets, 1000 was kicked out of the store, so
+// 1000 can only be traslated to 1, only previously stored offset is 0
+assertTranslationsNearby(store, 1000, 3840, 0);
+
+// We can translate offsets between the latest startup offset and the 
latest 

[jira] [Updated] (KAFKA-16707) Kafka Kraft : adding Principal Type in StandardACL for matching with KafkaPrincipal of connected client in order to defined ACL with a notion of group

2024-05-20 Thread Franck LEDAY (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franck LEDAY updated KAFKA-16707:
-
Description: 
Default StandardAuthorizer in Kraft mode is defining a KafkaPrincpal as 
type=User and a name, and a special wildcard eventually.

The difficulty with this solution is that we can't define ACL by group of 
KafkaPrincipal.

There is a way for the moment to do so by defining RULE to rewrite the 
KafkaPrincipal name field, BUT, to introduce this way the notion of group, you 
have to set rules which will make you loose the uniq part of the KafkaPrincipal 
name of the connected client.

The concept here, in the StandardAuthorizer of Kafka Kraft, is to add  the 
management of KafkaPrincipal type:
 * Regex
 * StartsWith
 * EndsWith
 * Contains
 * (User is still available and keep working as before to avoid any 
regression/issue with current configurations)

This would be done in the StandardAcl class of metadata/authorizer, and the 
findresult method of StandardAuthorizerData will delegate the match to the 
StandardAcl class (for performance reason, see below explanation).

By this way, you can still use RULEs to rewrite KafkaPrincipal name of 
connected client (say you want to transform a DN of SSL certificate : 
cn=myCN,ou=myOU,c=FR becomes myCN@myOU), and then, you can define a new ACL 
with principal like: 'Regex:^.*@my[oO]U$' that will match all connected client 
with a certificate bind to ou=myOU . Note in this particular case, the same can 
be done with 'EndsWtih:@myOU', and the type 'Contains' can work, but I imagine 
more the usage of this type for matching in a multigroup definition in a 
KafkaPrincipal.

 

Note about performance reason : for the moment, I have it implemented in a fork 
of StandardAuthroizer/StandardAuthroizerData/StandardAcl defined by the 
property authorizer.class.name in a cluster of Kraft with SSL authentication 
required and tested fine. But, by this way, every time that an ACL is checked 
against a KafkaPrincipal, I do a strcmp of the KafkaPrincipal type of the ACL 
to determine the matching method to be done. By implementing it in StandardAcl 
class, and then delegating the matching from StandardAuthorizerData to the 
StandardAcl class, this allow to analyse and store the type of the 
KafkaPrincipal method for matching as an enum, and the KafkaPrincipal name 
separately in order to avoid redoing the job each time a match has to be 
checked.

 

Here is my status of the implementation:
 * I have this solution ('performance reason') implemented in fork (then 
branch) of the 3.7.0 github repo,
 * I added few unit test, and a gradlew metadata:test is working fine on all 
tests except one (witch is failing also on branch 3.7.0 without my changes),
 * I added few lines about in security.html .

 

I'm opening the issue to discuss it with you, because I would like to create a 
PR on Github for next version.

  was:
Default StandardAuthorizer in Kraft mode is defining a KafkaPrincpal as 
type=User and a name, and a special wildcard eventually.

The difficulty with this solution is that we can't define ACL by group of 
KafkaPrincipal.

There is a way for the moment to do so by defining RULE to rewrite the 
KafkaPrincipal name field, BUT, to introduce this way the notion of group, you 
have to set rules which will make you loose the uniq part of the KafkaPrincipal 
name of the connected client.

The concept here, in the StandardAuthorizer of Kafka Kraft, is to add  the 
management of KafkaPrincipal type:
 * Regex
 * StartsWith
 * EndsWith
 * Contains
 * (User is still available and keep working as before to avoid any 
regression/issue with current configurations)

This would be done in the StandardAcl class of metadata/authorizer, and the 
findresult method of StandardAuthorizerData will delegate the match to the 
StandardAcl class (for performance reason, see below explanation).

By this way, you can still use RULEs to rewrite KafkaPrincipal name of 
connected client (say you want to transform a DN of SSL certificate : 
cn=myCN,ou=myOU,c=FR becomes myCN@myOU), and then, you can define a new ACL 
with principal like: 'Regex:^.*@my[oO]U$' that will match all connected client 
with a certificate bind to ou=myOU . Note in this particular case, the same can 
be done with 'EndsWtih:@myOU', and the type 'Contains' can work, but I imagine 
more the usage of this type for matching in a multigroup definition in a 
KafkaPrincipal.

 

Note about performance reason : for the moment, I have it implemented in a fork 
of StandardAuthroizer/StandardAuthroizerData/StandardAcl defined by the 
property authorizer.class.name in a cluster of Kraft with SSL authentication 
required and tested fine. But, by this way, every time that an ACL is checked 
against a KafkaPrincipal, I do a strcmp of the KafkaPrincipal type of the ACL 
to determine the matching method to be done. By implementing it in 

[PR] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure [kafka]

2024-05-20 Thread via GitHub


apourchet opened a new pull request, #16002:
URL: https://github.com/apache/kafka/pull/16002

   This PR creates the required methods to post-process the result of 
TaskAssignor.assign into the required ClientMetadata map. This allows most of 
the internal logic to remain intact after the user's assignment code runs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add a constructor to accept cause in ConfigException [kafka]

2024-05-20 Thread via GitHub


gaurav-narula commented on PR #15994:
URL: https://github.com/apache/kafka/pull/15994#issuecomment-2120832178

   Thanks for the feedback @gharris1727.
   
   I dug a bit and it turns out the flakey test 
`DynamicBrokerReconfigurationTest::testTrustStoreAlter`  that motivated this PR 
won't benefit from either cause or suppressed `Throwable`.
   
   This is because the test invokes `incrementalAlterConfigs` as a client and 
ConfigExceptions in the controller are converted to `ApiError` at 
https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java#L299
 and we lose the cause/supressed throwable at that point (they aren't sent over 
the wire) :(
   
   I think the easiest change to debug the flakiness is to increase the logging 
level at 
https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java#L187
 to `error` instead of `debug`.
   
   Alternatively, we may want to have that error log before 
https://github.com/apache/kafka/blob/81e609802187ac2bcbd0ac169fa10e8c02c237f5/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java#L299
 so that we ensure all reconfiguration errors are logged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16777:
--
Fix Version/s: 3.8.0

> New consumer should throw NoOffsetForPartitionException on continuous poll 
> zero if no reset strategy
> 
>
> Key: KAFKA-16777
> URL: https://issues.apache.org/jira/browse/KAFKA-16777
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If the consumer does not define an offset reset strategy, a call to poll 
> should fail with NoOffsetForPartitionException. That works as expected on the 
> new consumer when polling with a timeout > 0 (existing integration test 
> [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
>  but fails when polling continuously with ZERO timeout.
> This can be easily reproduced with a new integration test like this (passes 
> for the legacy consumer but fails for the new consumer). We should add it as 
> part of the fix, for better coverage:
> {code:java}
>   @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
>   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
>   def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
> groupProtocol: String): Unit = {
> this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> "none")
> val consumer = createConsumer(configOverrides = this.consumerConfig)
> consumer.assign(List(tp).asJava)
> // continuous poll should eventually fail because there is no offset 
> reset strategy set (fail only when resetting positions after coordinator is 
> known)
> TestUtils.tryUntilNoAssertionError() {
>   assertThrows(classOf[NoOffsetForPartitionException], () => 
> consumer.poll(Duration.ZERO))
> }
>   }
> {code}
> Also this is covered in the unit test 
> [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
>  that is currently enabled only for the LegacyConsumer. After fixing this 
> issue we should be able to enable it for the new consumer too.
> The issue seems to be around calling poll with ZERO timeout, that even when 
> called continuously, the consumer is not able to 
> initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it 
> to 
> [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
>  where the exception is thrown.
>  
> There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
> but filing this one to provide more context and point out the test failures 
> and suggested new tests,. All fail even with the current patch in KAFKA-16637 
> so needs investigation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16758:
--
Fix Version/s: 4.0.0

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-20 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1607019116


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##


Review Comment:
   I added a number of constructors here instead of altering the existing 
because they're public and I assume a part of the client library contract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-20 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847936#comment-17847936
 ] 

Igor Soarez commented on KAFKA-16692:
-

[~jolshan] done

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.7.1, 3.8
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira

[jira] [Updated] (KAFKA-16741) Add ShareGroupHeartbeat API support in GroupCoordinator

2024-05-20 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16741:
-
Summary: Add ShareGroupHeartbeat API support in GroupCoordinator  (was: Add 
SharGroupHeartbeat API support in GroupCoordinator)

> Add ShareGroupHeartbeat API support in GroupCoordinator
> ---
>
> Key: KAFKA-16741
> URL: https://issues.apache.org/jira/browse/KAFKA-16741
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix flaky testRecordThreadIdleRatio [kafka]

2024-05-20 Thread via GitHub


jeffkbkim commented on code in PR #15987:
URL: https://github.com/apache/kafka/pull/15987#discussion_r1607007949


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -61,8 +61,11 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
 
 @Override
 public CoordinatorEvent take() {
-time.sleep(takeDelayMs);
-return super.take();
+CoordinatorEvent event = super.take();

Review Comment:
   @gaurav-narula thanks for the feedback. that makes sense, i have 
incorporated your suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-20 Thread via GitHub


C0urante commented on PR #16001:
URL: https://github.com/apache/kafka/pull/16001#issuecomment-2120798453

   @gharris1727 If you have a moment, could you take a look? I filed this in 
anticipation of 
[KIP-891](https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins)
 since it seems that the KIP might make this bug slightly more common, and 
wanted to prevent users from running into it if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-20 Thread via GitHub


C0urante opened a new pull request, #16001:
URL: https://github.com/apache/kafka/pull/16001

   [Jira](https://issues.apache.org/jira/browse/KAFKA-9228)
   
   Currently, if a connector generates the same set of task configs as the set 
that already exists in the config topic, then this new set of task configs is 
not written to the config topic, and the connector's tasks are not 
(immediately*) restarted.
   
   This is intentional behavior to prevent infinite rebalance loops when using 
eager rebalancing, and most of the time it comes with no drawbacks. However, if 
the runtime-controlled properties for a connector (such as its key/value/header 
converters, or Kafka client overrides) are modified, then this behavior can 
cause the updates to not be applied (immediately*) to the connector's tasks.
   
   In order to address this, we forcibly rewrite task configs to the config 
topic when we detect a change in the connector config, even if they are 
identical to the existing task configs. Changes to the connector config are 
tracked by taking a hash of the connector config and including it in the config 
topic when writing task configs.
   
   If no hash has been written to the config topic yet, then we do not compare 
hashes. This is done in order to prevent upgrades to newer workers from causing 
all tasks on the cluster to be immediately restarted.
   
   As a final note, this bug should not be very prevalent in the Kafka Connect 
ecosystem, since most connectors will unintentionally propagate changes in 
runtime-controlled properties to their tasks. This is because the classic idiom 
for connectors is to track the properties provided in 
[Connector::start](https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#start(java.util.Map))
 and use either an identical clone or a slightly-modified copy of those 
properties in the return value of 
[Connector::taskConfigs](https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/Connector.html#taskConfigs(int)).
   
   \* - Tasks can be restarted later on as a result of workers joining/leaving 
the cluster, users manually triggering restarts via the REST API, or other 
causes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-20 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2120792114

   Hi @gharris1727 if you have the time, can you please have a look again ? 
thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16766:
--
Component/s: clients

> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException". 
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should write tests for each func, like the ones defined for 
> the Legacy Consumer 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
>  Note that we would need different tests, added to AsyncKafkaConsumerTest, 
> given that the async consumer issues a FindCoordinator request in this case, 
> but the AsyncConsumer does, so it does not account for that when matching 
> requests/responses in the current tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-20 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reopened KAFKA-16692:
-

Re-opening as 3.6 backport is still missing

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.7.1, 3.8
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira

[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16764:
--
Component/s: clients

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: appchemist
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-20 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847921#comment-17847921
 ] 

Justine Olshan commented on KAFKA-16692:


[~soarez] do you mind taking a quick look at this small change to run the new 
tests in 3.7? https://github.com/apache/kafka/pull/16000

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.7.1, 3.8
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging 

[PR] MINOR: remove extra import from transactions tests [kafka]

2024-05-20 Thread via GitHub


jolshan opened a new pull request, #16000:
URL: https://github.com/apache/kafka/pull/16000

   I missed that these two did not run for the cherrypick. Re running now.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-20 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2120776989

   @chia7712, Please review, Thanks for your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-20 Thread via GitHub


m1a2st commented on PR #15840:
URL: https://github.com/apache/kafka/pull/15840#issuecomment-2120776240

   @chia7712, Please review, Thanks for your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-9228:


Assignee: Chris Egerton  (was: Greg Harris)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-20 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847920#comment-17847920
 ] 

Justine Olshan commented on KAFKA-16692:


Hey, I still need to do 3.6. I will update the ticket when I do so.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.7.1, 3.8
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  

[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-20 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847911#comment-17847911
 ] 

Igor Soarez commented on KAFKA-16760:
-

After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in 
{{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and 
{{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit:
Partition  has an older epoch (0) than the current leader. 
Will await the new LeaderAndIsr state before resuming fetching.

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-20 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847911#comment-17847911
 ] 

Igor Soarez edited comment on KAFKA-16760 at 5/20/24 3:51 PM:
--

After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in 
{{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and 
{{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit:
{code:java}
Partition  has an older epoch (0) than the current leader. 
Will await the new LeaderAndIsr state before resuming fetching.{code}


was (Author: soarez):
After another look, {{alterReplicaLogDirs}} is already being tested in KRaft in 
{{testAlterReplicaLogDirs}} and {{testAlterLogDirsAfterDeleteRecords}} and 
{{kafka.api.PlaintextAdminIntegrationTest}} and this condition isn't hit:
Partition  has an older epoch (0) than the current leader. 
Will await the new LeaderAndIsr state before resuming fetching.

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-20 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846705#comment-17846705
 ] 

Igor Soarez edited comment on KAFKA-16760 at 5/20/24 3:48 PM:
--

[~showuon] I a few hours looking into this but couldn't yet figure out why the 
test is failing. 

Regarding the logs you first mentioned:
Will await the new LeaderAndIsr state before resuming fetching.
I'm not sure this is normal. -I've also not found an integration test covering 
alterLogDir for KRaft, so we need to add that!-


was (Author: soarez):
[~showuon] I a few hours looking into this but couldn't yet figure out why the 
test is failing. 

Regarding the logs you first mentioned:
Will await the new LeaderAndIsr state before resuming fetching.
I'm not sure this is normal. I've also not found an integration test covering 
alterLogDir for KRaft, so we need to add that!

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >