[GitHub] [kafka] PrasanthV454 opened a new pull request, #13922: [MINOR] remove the currentStream.close() statement causing exit code issue

2023-06-27 Thread via GitHub


PrasanthV454 opened a new pull request, #13922:
URL: https://github.com/apache/kafka/pull/13922

   currentStream shouldn't be closed as it is std err or std out. Only 
tempStream should be closed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #13376: KAFKA-14091: Leader proactively aborting tasks from lost workers in rebalance in EOS mode

2023-06-27 Thread via GitHub


github-actions[bot] commented on PR #13376:
URL: https://github.com/apache/kafka/pull/13376#issuecomment-1610635915

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #13478: KAFKA-14870: Fix KerberosLogin#relogin to invoke super#login when cre…

2023-06-27 Thread via GitHub


github-actions[bot] commented on PR #13478:
URL: https://github.com/apache/kafka/pull/13478#issuecomment-1610635895

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-27 Thread via GitHub


flashmouse commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1610578143

   @kirktrue thank you for reply!
   
   In your case, although ``isBalanced``  return false, 
``performReassignments`` still won't reassign any partition, because it would 
only do reassign when ``currentAssignment.get(consumer).size() > 
currentAssignment.get(otherConsumer).size() + 1``, it will just loop all 
partitions in reassignable list and find no one match it so ``modified`` keep 
``false`` then return done.
   
   I think the problem is ``isBalanced`` and ``performReassignments`` should 
have the same predict logic when they judge whether should do reassign. for now 
 ``performReassignments`` itself think should do reassign only when 2 consumers 
that their partition num gap should have at least 2 but  ``isBalanced`` is 1. 
   
   I mean with such situation, I think if we not modify ``isBalanced``,  the 
reassignment logic is still right, but may very slow.
``performReassignments`` will do quite a lot unnecessary work, as I said above, 
it could done finally but cost a long time. you can try unit test 
``testLargeAssignmentAndGroupWithNonEqualSubscription`` without this pr change, 
if set ``partitionCount``= 20, ``consumerCount`` = 2, it could done with 
success, but if set ``partitionCount``= 200, ``consumerCount`` = 20, it may run 
very long time. I have run the test and no result after 14min, and it could 
speed up to 47sec after I apply my change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.
+ */
+public void resetNextMetadataRefreshTime() {
+this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+}
+
+/**
+ * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+ * 1) The next update time is smaller or equals to the current time;
+ * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   This is also the case when we reset `nextMetadataRefreshTime` right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.
+ */
+public void resetNextMetadataRefreshTime() {
+this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+}
+
+/**
+ * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+ * 1) The next update time is smaller or equals to the current time;
+ * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   This is also the case when we reset the refreshMetadata time right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244507539


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.

Review Comment:
   This means we should update immediately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244506236


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -119,6 +131,18 @@ public String toString() {
  */
 private final TimelineHashMap> 
currentPartitionEpoch;
 
+/**
+ * The next metadata refresh time. It consists of a timestamp in 
milliseconds together with
+ * the group epoch at the time of setting it. The metadata refresh time is 
considered as a
+ * soft state (read that it is not stored in a timeline data structure). 
It is like this
+ * because it is not persisted to the log. The group epoch is here to 
ensure that the
+ * next metadata refresh time is invalidated if the group epoch does not 
correspond to
+ * the current group epoch. This can happen if the next metadata refresh 
time is updated
+ * after having refreshed the metadata but the write operation failed. In 
this case, the
+ * time is not automatically rollback.

Review Comment:
   nit: automatically rolled back.
   
   Also in this case, do we not update the refresh time until the epoch bump 
and write succeed? And we will keep refreshing the metadata in the meantime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244500812


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   I guess my question then is what is the flow for updating the groups with 
the image? This will just happen on the next heartbeat since we set 
metadataImage to new image? 
   
   We don't really do any notifying besides resetting the refresh timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   As mentioned before, I got confused and thought this was actually adding 
topics. If this is just a call to update the existing groups with these topics, 
this is fine. (Since new topics won't be part of groups yet)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   As mentioned before, I got confused and thought this was actually adding 
topics. If this is just a call to update the existing groups with these topics, 
this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   I took a longer look at the code -- and assuming changedTopics includes the 
newly created topics, this should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244496874


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or
+// deleted topics.
+Set allGroupIds = new HashSet<>();
+delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+String topicName = topicDelta.name();
+Set groupIds = groupsByTopics.get(topicName);
+if (groupIds != null) allGroupIds.addAll(groupIds);
+});
+delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+TopicImage topicImage = delta.image().topics().getTopic(topicId);
+Set groupIds = groupsByTopics.get(topicImage.name());
+if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   Oh i misunderstood -- this is to add to the list of groups to notify.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   Is created topics a different method in topicsDelta? Shouldn't we have 
`createdTopicIds` and we add them? Or is changedTopics accounting for that? Are 
there other changes besides topic creation we can have? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

2023-06-27 Thread via GitHub


lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1244450143


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 return brokerSideConfigEntry.value();
 }
 
+public Map> getTopicPartitionInfo(final 
Set topics) {
+log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+Set topicsToDescribe = new HashSet<>(topics);
+final Map> topicPartitionInfo = new 
HashMap<>();
+
+while (!topicsToDescribe.isEmpty()) {
+final Map> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+topicPartitionInfo.putAll(existed);
+topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+if (!topicsToDescribe.isEmpty()) {
+currentWallClockMs = time.milliseconds();
+
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format(
+"Could not create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is 
temporarily not available.",
+retryTimeoutMs);
+log.error(timeoutError);

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config 
brokerSideTopicConfig,
 return brokerSideConfigEntry.value();
 }
 
+public Map> getTopicPartitionInfo(final 
Set topics) {
+log.debug("Starting to describe topics {} in partition assignor.", 
topics);
+
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+Set topicsToDescribe = new HashSet<>(topics);
+final Map> topicPartitionInfo = new 
HashMap<>();
+
+while (!topicsToDescribe.isEmpty()) {
+final Map> existed = 
getTopicPartitionInfo(topicsToDescribe, null);
+topicPartitionInfo.putAll(existed);
+topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+if (!topicsToDescribe.isEmpty()) {
+currentWallClockMs = time.milliseconds();
+
+if (currentWallClockMs >= deadlineMs) {
+final String timeoutError = String.format(
+"Could not create topics within %d milliseconds. " +
+"This can happen if the Kafka cluster is 
temporarily not available.",
+retryTimeoutMs);
+log.error(timeoutError);
+throw new TimeoutException(timeoutError);
+}
+log.info(

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+private final Cluster fullMetadata;
+private final Map> partitionsForTask;
+private final Map>> processRacks;
+private final AssignmentConfigs assignmentConfigs;
+private final Map> racksForPartition;
+private final InternalTopicManager internalTopicManager;
+private Boolean canEnableForActive;
+
+public RackAwareTaskAssignor(final Cluster fullMetadata,
+ final Map> 
partitionsForTask,
+ final Map> 
tasksForTopicGroup,
+ final Map>> processRacks,
+ final InternalTopicManager 

[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   Is created topics a different method in topicsDelta? Shouldn't we have 
`createdTopicIds` and we add them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244492766


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or
+// deleted topics.
+Set allGroupIds = new HashSet<>();
+delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+String topicName = topicDelta.name();
+Set groupIds = groupsByTopics.get(topicName);
+if (groupIds != null) allGroupIds.addAll(groupIds);
+});
+delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+TopicImage topicImage = delta.image().topics().getTopic(topicId);
+Set groupIds = groupsByTopics.get(topicImage.name());
+if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   we don't want to add all the deleted topics do we?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -727,6 +800,80 @@ public void replay(
 + " but did not receive 
ConsumerGroupTargetAssignmentMetadataValue tombstone.");
 }
 consumerGroup.removeMember(memberId);
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+}
+}
+
+/**
+ * @return The set of groups subscribed to the topic.
+ */
+public Set groupsSubscribedToTopic(String topicName) {
+Set groups = groupsByTopics.get(topicName);
+return groups != null ? groups : Collections.emptySet();
+}
+
+/**
+ * Subscribes a group to a topic.
+ *
+ * @param groupId   The group id.
+ * @param topicName The topic name.
+ */
+private void subscribeGroupToTopic(
+String groupId,
+String topicName
+) {
+groupsByTopics
+.computeIfAbsent(topicName, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+.add(groupId);
+}
+
+/**
+ * Unsubscribes a group from a topic.
+ *
+ * @param groupId   The group id.
+ * @param topicName The topic name.
+ */
+private void unsubscribeGroupFromTopic(

Review Comment:
   should return be groupsIds type here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -727,6 +800,80 @@ public void replay(
 + " but did not receive 
ConsumerGroupTargetAssignmentMetadataValue tombstone.");
 }
 consumerGroup.removeMember(memberId);
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+}
+}
+
+/**
+ * @return The set of groups subscribed to the topic.
+ */
+public Set groupsSubscribedToTopic(String topicName) {
+Set groups = groupsByTopics.get(topicName);
+return groups != null ? groups : Collections.emptySet();
+}
+
+/**
+ * Subscribes a group to a topic.
+ *
+ * @param groupId   The group id.
+ * @param topicName The topic name.
+ */
+private void subscribeGroupToTopic(
+String groupId,
+String topicName
+) {
+groupsByTopics
+.computeIfAbsent(topicName, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+.add(groupId);
+}
+
+/**
+ * Unsubscribes a group from a topic.
+ *
+ * @param groupId   The group id.
+ * @param topicName The topic name.
+ */
+private void unsubscribeGroupFromTopic(

Review Comment:
   should return be boolean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244482511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -709,14 +780,16 @@ public void replay(
 String groupId = key.groupId();
 String memberId = key.memberId();
 
+ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
value != null);
+Set oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
+
 if (value != null) {
-ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
 ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
 consumerGroup.updateMember(new 
ConsumerGroupMember.Builder(oldMember)
 .updateWith(value)
 .build());
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());

Review Comment:
   could this and line 803 be outside the if/else? I'm just curious why we 
moved consumerGroup and old SubscribedTopicNames out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector

2023-06-27 Thread via GitHub


C0urante commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1244478202


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -150,10 +156,31 @@ private void loadInitialConsumerGroups()
 
 List findConsumerGroups()
 throws InterruptedException, ExecutionException {
-return listConsumerGroups().stream()
+List filteredGroups = listConsumerGroups().stream()
 .map(ConsumerGroupListing::groupId)
-.filter(this::shouldReplicate)
+.filter(this::shouldReplicateByGroupFilter)
 .collect(Collectors.toList());
+
+List checkpointGroups = new LinkedList<>();
+List irrelevantGroups = new LinkedList<>();
+
+for (String group : filteredGroups) {
+Set consumedTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+.map(TopicPartition::topic)
+.filter(this::shouldReplicateByTopicFilter)

Review Comment:
   I don't believe MM2 ever supported that. From 
[KIP-382](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-InternalTopics),
 regarding checkpoints (emphasis mine):
   
   > The connector will periodically query the source cluster for all committed 
offsets from all consumer groups, **filter for those topics being replicated**, 
and emit a message to a topic
   
   IIUC, MM2 also performs offset syncing based on the contents of the offset 
syncs topic, which is only populated by the source connector (i.e., the 
connector that replicates topics).
   
   @blacktooth have you experienced otherwise?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244477507


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -88,10 +93,12 @@ public class GroupMetadataManager {
 public static class Builder {
 private LogContext logContext = null;
 private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
 private List assignors = null;
-private TopicsImage topicsImage = null;

Review Comment:
   Did we change this to MetadataImage because it was easier to pass in that 
class? Or do we need features in the metadata image that were not in the topic 
image?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244475836


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -506,32 +555,54 @@ private 
CoordinatorResult consumerGr
 .setClientHost(clientHost)
 .build();
 
+boolean updatedMemberSubscriptions = false;
 if (!updatedMember.equals(member)) {
 records.add(newMemberSubscriptionRecord(groupId, updatedMember));
 
 if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
 log.info("[GroupId " + groupId + "] Member " + memberId + " 
updated its subscribed topics to: " +
 updatedMember.subscribedTopicNames());
+updatedMemberSubscriptions = true;
+}
 
-subscriptionMetadata = group.computeSubscriptionMetadata(
-member,
-updatedMember,
-topicsImage
-);
-
-if 
(!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new 
subscription metadata: "
-+ subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
-}
+if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
updated its subscribed regex to: " +
+updatedMember.subscribedTopicRegex());
+updatedMemberSubscriptions = true;
+}
+}
 
-groupEpoch += 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+long currentTimeMs = time.milliseconds();
+boolean maybeUpdateMetadata = updatedMemberSubscriptions || 
group.refreshMetadataNeeded(currentTimeMs);
+boolean updatedSubscriptionMetadata = false;
+if (maybeUpdateMetadata) {
+subscriptionMetadata = group.computeSubscriptionMetadata(
+member,
+updatedMember,
+metadataImage.topics()
+);
 
-log.info("[GroupId " + groupId + "] Bumped group epoch to " + 
groupEpoch + ".");
+if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
++ subscriptionMetadata + ".");
+records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+updatedSubscriptionMetadata = true;
 }
 }
 
+if (updatedMemberSubscriptions || updatedSubscriptionMetadata) {
+groupEpoch += 1;
+records.add(newGroupEpochRecord(groupId, groupEpoch));
+log.info("[GroupId " + groupId + "] Bumped group epoch to " + 
groupEpoch + ".");
+}
+
+if (maybeUpdateMetadata) {

Review Comment:
   While I am able to follow the logic and understand it is probably the best 
way to avoid duplicate code, I do wonder if there is a less confusing way to 
express this without the similar booleans. It may not be possible, but maybe we 
can add some comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244469064


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -179,26 +209,45 @@ GroupMetadataManager build() {
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata refresh interval.
  */
-private TopicsImage topicsImage;
+private final int consumerGroupMetadataRefreshIntervalMs;
+
+/**
+ * The metadata image.
+ *
+ * Package private for testing.

Review Comment:
   this is just private right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


jolshan commented on PR #13880:
URL: https://github.com/apache/kafka/pull/13880#issuecomment-1610300192

   Looks pretty good. I think if we want to do this as part the the PR, there's 
just this left: https://github.com/apache/kafka/pull/13880/files#r1244223827


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1610267331

   I ran some tests with producer-perf. I didn't see noticeable differences, 
but the tests were not particularly long. I can run more if needed. The 
`--transaction-duration` argument tells how many milliseconds before we call 
commit. Also note that really only one partition was produced to, so only that 
one verification per transaction will trigger the metric.
   
   ```
   bin/kafka-producer-perf-test.sh --transaction-duration-ms 1000 --record-size 
1000 --throughput -1 --num-records 100 --topic test-topic --producer.config 
config/producer.properties
   
   TRUNK
   100 records sent, 215842.866393 records/sec (205.84 MB/sec), 16.21 ms 
avg latency, 153.00 ms max latency, 0 ms 50th, 119 ms 95th, 147 ms 99th, 152 ms 
99.9th.
   100 records sent, 223513.634332 records/sec (213.16 MB/sec), 19.28 ms 
avg latency, 139.00 ms max latency, 1 ms 50th, 118 ms 95th, 133 ms 99th, 138 ms 
99.9th.
   100 records sent, 214638.334407 records/sec (204.70 MB/sec), 19.43 ms 
avg latency, 134.00 ms max latency, 1 ms 50th, 123 ms 95th, 131 ms 99th, 133 ms 
99.9th.
   
   KAFKA-15028

   100 records sent, 217485.863419 records/sec (207.41 MB/sec), 17.88 ms 
avg latency, 151.00 ms max latency, 0 ms 50th, 128 ms 95th, 145 ms 99th, 149 ms 
99.9th.

   100 records sent, 229568.411387 records/sec (218.93 MB/sec), 17.15 ms 
avg latency, 137.00 ms max latency, 0 ms 50th, 118 ms 95th, 130 ms 99th, 136 ms 
99.9th.

   100 records sent, 220653.133274 records/sec (210.43 MB/sec), 16.47 ms 
avg latency, 134.00 ms max latency, 1 ms 50th, 116 ms 95th, 128 ms 99th, 133 ms 
99.9th.
   
   bin/kafka-producer-perf-test.sh --transaction-duration-ms 300 --record-size 
1000 --throughput -1 --num-records 100 --topic test-topic --producer.config 
config/producer.properties
   TRUNK
   100 records sent, 213812.272824 records/sec (203.91 MB/sec), 15.79 ms 
avg latency, 142.00 ms max latency, 1 ms 50th, 101 ms 95th, 136 ms 99th, 142 ms 
99.9th.
   
100 records sent, 213174.163291 records/sec (203.30 MB/sec), 13.00 ms 
avg latency, 121.00 ms max latency, 1 ms 50th, 96 ms 95th, 118 ms 99th, 120 ms 
99.9th.

   100 records sent, 225580.870742 records/sec (215.13 MB/sec), 12.45 ms 
avg latency, 128.00 ms max latency, 1 ms 50th, 101 ms 95th, 123 ms 99th, 127 ms 
99.9th.
   
   KAFKA-15028
   100 records sent, 218531.468531 records/sec (208.41 MB/sec), 11.97 ms 
avg latency, 69.00 ms max latency, 1 ms 50th, 52 ms 95th, 65 ms 99th, 68 ms 
99.9th.

   100 records sent, 217864.923747 records/sec (207.77 MB/sec), 13.21 ms 
avg latency, 119.00 ms max latency, 1 ms 50th, 103 ms 95th, 117 ms 99th, 119 ms 
99.9th.

   100 records sent, 214868.929953 records/sec (204.91 MB/sec), 13.07 ms 
avg latency, 118.00 ms max latency, 1 ms 50th, 94 ms 95th, 115 ms 99th, 117 ms 
99.9th.
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] blacktooth commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector

2023-06-27 Thread via GitHub


blacktooth commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1244382978


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -150,10 +156,31 @@ private void loadInitialConsumerGroups()
 
 List findConsumerGroups()
 throws InterruptedException, ExecutionException {
-return listConsumerGroups().stream()
+List filteredGroups = listConsumerGroups().stream()
 .map(ConsumerGroupListing::groupId)
-.filter(this::shouldReplicate)
+.filter(this::shouldReplicateByGroupFilter)
 .collect(Collectors.toList());
+
+List checkpointGroups = new LinkedList<>();
+List irrelevantGroups = new LinkedList<>();
+
+for (String group : filteredGroups) {
+Set consumedTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+.map(TopicPartition::topic)
+.filter(this::shouldReplicateByTopicFilter)

Review Comment:
   Is this backward compatible? Can this break use-cases where user is using 
MM2 to sync the offsets but not necessarily topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-06-27 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15028:
-
Attachment: latency-cpu.html

> AddPartitionsToTxnManager metrics
> -
>
> Key: KAFKA-15028
> URL: https://issues.apache.org/jira/browse/KAFKA-15028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Attachments: latency-cpu.html
>
>
> KIP-890 added metrics for the AddPartitionsToTxnManager
> VerificationTimeMs – number of milliseconds from adding partition info to the 
> manager to the time the response is sent. This will include the round trip to 
> the transaction coordinator if it is called. This will also account for 
> verifications that fail before the coordinator is called.
> VerificationFailureRate – rate of verifications that returned in failure 
> either from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1610196155

   Thanks for sharing this flame graph. I see that the histogram takes up the 
majority of the processCompletedSends, but compared to the total cpu usage, 
it's about 1%. I still think it is worth considering though, so I will look at 
perf and consider the debug level metric. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244331034


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   Oh hmm -- so we would have to predefine or lazily create the meters with the 
various error messages. I'm wondering if this is still necessary. We should be 
able to figure out the errors from either the api error metrics or from the 
logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] xiaocairush commented on a diff in pull request #13884: MINOR: fix typos for client

2023-06-27 Thread via GitHub


xiaocairush commented on code in PR #13884:
URL: https://github.com/apache/kafka/pull/13884#discussion_r1244247840


##
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java:
##
@@ -854,7 +854,7 @@ public void shouldThrowOnInvalidDateFormatOrNullTimestamp() 
{
 
 private void checkExceptionForGetDateTimeMethod(Executable executable) {
 assertTrue(assertThrows(ParseException.class, executable)
-.getMessage().contains("Unparseable date"));
+.getMessage().contains("Unparsable date"));

Review Comment:
   fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-27 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14945:
-
Labels: kip  (was: )

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-27 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14945:
-
Fix Version/s: 3.6.0

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
> Fix For: 3.6.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage

2023-06-27 Thread via GitHub


divijvaidya commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1610089621

   Hey @Vaibhav-Nazare 
   A KIP needs at least 3 committer votes and I believe we haven't heard from 
other folks in the community on the KIP. I am waiting for others to chime in. 
Otherwise, we can start a vote in 2 weeks (and hope that the KIP gets at least 
3 votes).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-27 Thread via GitHub


machi1990 commented on PR #13903:
URL: https://github.com/apache/kafka/pull/13903#issuecomment-1610089280

   Thank you @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dependabot[bot] commented on pull request #13743: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-27 Thread via GitHub


dependabot[bot] commented on PR #13743:
URL: https://github.com/apache/kafka/pull/13743#issuecomment-1610085437

   OK, I won't notify you again about this release, but will get in touch when 
a new version is available. If you'd rather skip all updates until the next 
major or minor version, let me know by commenting `@dependabot ignore this 
major version` or `@dependabot ignore this minor version`.
   
   If you change your mind, just re-open this PR and I'll resolve any conflicts 
on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya closed pull request #13743: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-27 Thread via GitHub


divijvaidya closed pull request #13743: Bump requests from 2.24.0 to 2.31.0 in 
/tests
URL: https://github.com/apache/kafka/pull/13743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-27 Thread via GitHub


divijvaidya merged PR #13903:
URL: https://github.com/apache/kafka/pull/13903


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1244223827


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short recordType = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+readMessage(keyMessage, keyBuffer, recordType, "key");
+
+if (valueBuffer == null) {
+return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(recordType);
+final short valueVersion = readVersion(valueBuffer, "value");

Review Comment:
   I will add a description of the format in the javadoc of the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-27 Thread via GitHub


vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239143225


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
 }
 }
 
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+builder = new StreamsBuilder();
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table("tableTopic2", consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+processor = supplier.theCapturedProcessor();
+}
+
+@Test
+public void shouldFailIfTableIsNotVersioned() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+() -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+assertThat(
+exception.getMessage(),
+is("KTable must be versioned to use a grace period in a stream 
table join.")
+);
+}
+
+@Test
+public void shouldDelayJoinByGracePeriod() {
+makeJoin(Duration.ofMillis(2));
+
+// push four items to the table. this should not produce any item.
+pushToTableNonRandom(4, "Y");
+processor.checkAndClearProcessResult(EMPTY);
+
+// push all four items to the primary stream. this should produce two 
items.
+pushToStream(4, "X");
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "X0+Y0", 0),
+new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+// push all items to the table. this should not produce any item
+pushToTableNonRandom(4, "YY");
+processor.checkAndClearProcessResult(EMPTY);
+
+// push all four items to the primary stream. this should produce two 
items.
+pushToStream(4, "X");
+processor.checkAndClearProcessResult(

Review Comment:
   The reason this produces two output records is because the max timestamp 
seen so far is still 3, which means only records with timestamp 0 and 1 are 
emitted (timestamps 2 and 3 are still in the buffer). Can you add another step 
to this test which now produces a record with a larger timestamp and verifies 
that the records with timestamps 2 and 3 are emitted? There should be four of 
them, and they should be emitted in timestamp order which is different from the 
offset order that they arrived in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1244173231


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short recordType = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+readMessage(keyMessage, keyBuffer, recordType, "key");
+
+if (valueBuffer == null) {
+return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(recordType);
+final short valueVersion = readVersion(valueBuffer, "value");

Review Comment:
   I can see this being confusing, but I'm not sure if I have a recommendation 
to make it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609922486

   > if this does affect performance too much
   
   Maybe we could use some type of recoding level for Yammer metrics too? We 
already have a configuration at:
   
https://kafka.apache.org/documentation.html#brokerconfigs_metrics.recording.level.
 We can emit this metric at DEBUG level.
   
   Also, FYI, here [1] is the flamegraph where you can observe the impact of 
histogram.update() for requests. After opening this flamegraph in browser, you 
can look at call stack of processCompletedSends
   
   [1] 
https://github.com/divijvaidya/flamegraph-samples/blob/main/kafka/kafka-summit-2023/100mslinger-cpu-unclean.html
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244076297


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   oh, I missed that. Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability

2023-06-27 Thread Arushi Rai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737791#comment-17737791
 ] 

Arushi Rai commented on KAFKA-15128:


Hi [~ckamal] 
If possible, can you share the expected release version where this 
vulnerability will be resolved? 

> snappy-java-1.1.8.4.jar library vulnerability
> -
>
> Key: KAFKA-15128
> URL: https://issues.apache.org/jira/browse/KAFKA-15128
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: priyatama
>Priority: Major
> Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png
>
>
> Hi Team,
> we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
> need to get rid of it.
> !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!
> during analysis, we found snappy-java coming via kafka-clients.
> As our application is not directly using snappy-java jar.
> Can any one please explain what is use of snappy-java in kafka-client or can 
> we exclude that?
> Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
> will release next version which is having non-vulnerable snappy-java jar in 
> it?
> cc: [Mickael 
> Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1244065077


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {
+private final short unknownType;
+
+public UnknownRecordTypeException(short unknownType) {
+super(String.format("Found an unknown record type %d", 
unknownType));
+this.unknownType = unknownType;
+}
+
+public short unknownType() {
+return unknownType;

Review Comment:
   ah in the log message  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1244064321


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {

Review Comment:
   Got it -- so it was handled as its own key and not catching a runtime 
exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1244063578


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short recordType = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+readMessage(keyMessage, keyBuffer, recordType, "key");
+
+if (valueBuffer == null) {
+return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(recordType);
+final short valueVersion = readVersion(valueBuffer, "value");

Review Comment:
   Ah I got confused by the `apiMessageValueFor` method. That one uses the 
key's version, but the value uses the record version. I see now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13921: MINOR: Reduce (hopefully) flakiness of testRackAwareRangeAssignor

2023-06-27 Thread via GitHub


dajac commented on PR #13921:
URL: https://github.com/apache/kafka/pull/13921#issuecomment-1609889975

   The test still fails. There is likely something else going on... I will keep 
investigating.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609874663

   > Although one histogram calculation here should be ok, but it would be nice 
if you get some producer-perf.sh data in as well to ensure that this metric 
isn't adversely impacting latency. Thoughts?
   
   I can take a look at this. Is there an alternative metric that you suggest 
if this does affect performance too much?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244044199


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   Error code makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244035570


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   Histogram is biased by default. This code uses a biased histogram, just as 
the other metrics do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1244034254


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   What version are we tagging if we don't end up sending the request though? I 
don't think it makes sense. We will also have the version on the api error 
metrics, right? I don't think version here is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15086) The unreasonable segment size setting of the internal topics in MM2 may cause the worker startup time to be too long

2023-06-27 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15086:
---
Labels: kip-943  (was: )

> The unreasonable segment size setting of the internal topics in MM2 may cause 
> the worker startup time to be too long
> 
>
> Key: KAFKA-15086
> URL: https://issues.apache.org/jira/browse/KAFKA-15086
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>  Labels: kip-943
> Attachments: WechatIMG364.jpeg, WechatIMG365.jpeg, WechatIMG366.jpeg
>
>
> As the config 'segment.bytes' for topics related MM2(such as 
> offset.storage.topic, config.storage.topic,status.storage.topic), if 
> following the default configuration of the broker or set it larger, then when 
> the MM cluster runs many and complicated tasks, especially the log volume of 
> the topic 'offset.storage.topic' is very large, it will affect the restart 
> speed of the MM workers.
> After investigation, the reason is that a consumer needs to be started to 
> read the data of ‘offset.storage.topic’ at startup. Although this topic is 
> set to compact, if the 'segment size' is set to a large value, such as the 
> default value of 1G, then this topic may have tens of gigabytes of data that 
> cannot be compacted and has to be read from the earliest (because the active 
> segment cannot be cleaned), which will consume a lot of time (in our online 
> environment, we found that this topic stores 13G of data, it took nearly half 
> an hour for all the data to be consumed), which caused the worker to be 
> unable to start and execute tasks for a long time.
> Of course, the number of consumer threads can also be adjusted, but I think 
> it may be easier to reduce the 'segment size', for example, refer to the 
> default value of __consumer_offsets: 100MB
>  
> The first picture in the attachment is the log size stored in the internal 
> topic, the second one is the time when ‘offset.storage.topic’ starts to be 
> read, and the third one is the time when ‘offset.storage.topic’ being read 
> finished. It took about 23 minutes in total.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah merged pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions

2023-06-27 Thread via GitHub


mumrah merged PR #13910:
URL: https://github.com/apache/kafka/pull/13910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-27 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737711#comment-17737711
 ] 

Erik van Oosten commented on KAFKA-14972:
-

I will complete the KIP tomorrow.

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> We propose to replace the thread-id check with an access-id that is stored on 
> a thread-local variable. Existing programs will not be affected. Developers 
> that work in an async runtime can pick up the access-id and set it on the 
> thread-local variable in a thread of their choosing.
> Every time a callback is invoked a new access-id is generated. When the 
> callback completes, the previous access-id is restored.
> This proposal does not make it impossible to use the client incorrectly. 
> However, we think it strikes a good balance between making correct usage from 
> an async runtime possible while making incorrect usage difficult.
> Alternatives considered:
>  # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2023-06-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737709#comment-17737709
 ] 

Kirk True commented on KAFKA-7143:
--

cc [~pnee] [~lianetm] 

> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> -
>
> Key: KAFKA-7143
> URL: https://issues.apache.org/jira/browse/KAFKA-7143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Raman Gupta
>Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's are usually easily be adapted to this style 
> with a simple wrapper). With coroutines, continuations are used instead: 
> methods with callbacks are suspended, and resumed once the call is complete. 
> With coroutines, while access to the KafkaConsumer is done in a thread-safe 
> way, it does NOT necessarily happen from a single thread -- a different 
> underlying thread may actually execute the code after the suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.
> There is a workaround for the above: run the consumer in a coroutine with a 
> single-thread context, which isn't ideal because it dedicates a thread to the 
> consumer.
> However, further problems await -- the `commitAsync` method also cannot be 
> used with coroutines because the callback is never executed and therefore the 
> coroutine is never resumed past the suspension point. Upon investigation, it 
> seems the callback is only executed after future calls to poll, which in a 
> regular polling loop with coroutines will never happen because of the 
> suspension on `commitAsync`, so we have a deadlock. I guess the idea behind 
> this Kafka consumer API design is that consuming new messages may continue, 
> even though commits of previous offsets (which happened an arbitrarily long 
> amount of time in the past) have not necessarily been processed. However, 
> with a coroutine based API, the commitAsync can be sequential before the next 
> poll like commitSync, but happen asynchronously without tying up a client 
> application thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737710#comment-17737710
 ] 

Kirk True commented on KAFKA-14972:
---

cc [~pnee] [~lianetm] 

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> We propose to replace the thread-id check with an access-id that is stored on 
> a thread-local variable. Existing programs will not be affected. Developers 
> that work in an async runtime can pick up the access-id and set it on the 
> thread-local variable in a thread of their choosing.
> Every time a callback is invoked a new access-id is generated. When the 
> callback completes, the previous access-id is restored.
> This proposal does not make it impossible to use the client incorrectly. 
> However, we think it strikes a good balance between making correct usage from 
> an async runtime possible while making incorrect usage difficult.
> Alternatives considered:
>  # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14972:
--
Labels: needs-kip  (was: )

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> We propose to replace the thread-id check with an access-id that is stored on 
> a thread-local variable. Existing programs will not be affected. Developers 
> that work in an async runtime can pick up the access-id and set it on the 
> thread-local variable in a thread of their choosing.
> Every time a callback is invoked a new access-id is generated. When the 
> callback completes, the previous access-id is restored.
> This proposal does not make it impossible to use the client incorrectly. 
> However, we think it strikes a good balance between making correct usage from 
> an async runtime possible while making incorrect usage difficult.
> Alternatives considered:
>  # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions

2023-06-27 Thread via GitHub


jsancio commented on code in PR #13910:
URL: https://github.com/apache/kafka/pull/13910#discussion_r1243870229


##
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##
@@ -207,7 +211,7 @@ public void testTriggerLeaderEpochBumpIfNeeded() {
 createFooBuilder()
 .setTargetIsrWithBrokerStates(
 
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 
3, 4)))
-.setBumpLeaderEpochOnIsrShrink(true),
+.enableBumpLeaderEpochOnIsrShrink(true),

Review Comment:
   Got it. Outside the scope of this PR but it looks like we are missing some 
validation. This is adding 4 to the ISR but 4 is not a replica for this 
partition. cc @ahuang98 



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -300,9 +288,13 @@ private void tryElection(PartitionChangeRecord record) {
  */
 void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
 if (record.leader() == NO_LEADER_CHANGE) {
+boolean bumpLeaderEpochOnIsrShrink = 
metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
+
 if (!Replicas.contains(targetReplicas, partition.replicas)) {
+// Reassignment
 record.setLeader(partition.leader);
-} else if (bumpLeaderEpochOnIsrShrink && 
!Replicas.contains(targetIsr, partition.isr)) {
+} else if (!Replicas.contains(targetIsr, partition.isr) && 
bumpLeaderEpochOnIsrShrink) {

Review Comment:
   We should check `bumpLeaderEpochOnIsrShrink` first as it is faster to 
compute. It avoids a search through the ISR arrays when it is false (most cases 
after MV 3.6).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-27 Thread via GitHub


kirktrue commented on PR #13917:
URL: https://github.com/apache/kafka/pull/13917#issuecomment-1609649429

   @jsancio Another difference is that now the `outer` exception's stack trace 
will be shown via `WARN` instead of just the exception's message via `DEBUG`. I 
assume that's intentional, but just wanted to call it out. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] drawxy commented on a diff in pull request #13847: KAFKA-15082: The log retention policy doesn't take effect after altering log dir

2023-06-27 Thread via GitHub


drawxy commented on code in PR #13847:
URL: https://github.com/apache/kafka/pull/13847#discussion_r1243863992


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1808,7 +1809,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
   // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  replicaAlterLogDirsManager.getFetcher(topicPartition) match {

Review Comment:
   Recently, I deployed this change to my cluster and found that this change 
didn't work in the scenario where an exception was raised. During altering, the 
partition being altered log dir would be marked as failed due to exceptions and 
the fetcher of it would be removed **without resuming the cleaning**. After 
receiving a LeaderAndISR request of that partition, the altering would be 
restarted by assign a fetcher to that partition, and the cleaning of that 
partition would be paused one more time due to no assigned fetcher. Therefore, 
the cleaning will never be resumed (pause twice, but resume once after altering 
completed).
   
   I pushed another commit. Every time assign fetcher to partition, pause the 
cleaning of that partition. And every time remove the fetcher, resume the 
cleaning of that partition. To make sure that resuming as many times as 
pausing. If this change make sense to you, I will add unit test later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-27 Thread via GitHub


kirktrue commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609641313

   @flashmouse I'm a little slow on the uptake, so I'm trying to come up with a 
scenario. Let's say the following is true:
   
   * Topic `foo` has three partitions
   * The consumer group has two consumers, `consumer` and `otherConsumer`
   
   In the first case, let's say that `consumer` has one partition and 
`otherConsumer` has two. In that case it's as balanced as possible, right? But 
the code in `isBalanced` as written disagrees:
   
   ```java
   if (consumerPartitionCount < otherConsumerPartitionCount) {
   return false;
   }
   ```
   
   Because `1` is less than `2`  But it's going to run into problems because 
by returning `false` from `isBalanced`, I assume we then trigger a rebalance, 
which—at best— will cause the ownership of that partition to move from 
`otherConsumer` to `consumer`. When the next balance check for is run, we're in 
the same position 路‍♂️ 
   
   But with your fix:
   
   ```java
   if (consumerPartitionCount + 1 < otherConsumerPartitionCount) {
   return false;
   }
   ```
   
   It's effectively stating that the `consumer` has to have _at least_ two 
partitions difference, correct? Because `1 + 1` is _not less_ then `2`.
   
   Do I have the above (mostly) correct?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-27 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737696#comment-17737696
 ] 

Josep Prat commented on KAFKA-15105:


Hi [~riedelmax], feel free to assign this issue to yourself :)
{quote}I'm still trying to understand how the build infrastructure works. Can 
someone give me a hint, how to reproduce the behavior?
{quote}
In this rely part of the problem, many times these issues are not easily 
reproducible on your machine. Usually they occur on CI.

The test itself is under the core module, so one way to try to reproduce it 
would be running `./gradlew core:test` or `./gradlew core:test --tests 
integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable`repeatedly
 until failure. IntelliJ (if you have it and use it) has a similar feature 
where one can run a test until failure.

Is this answering your question? Or you wanted to know some other specific 
details?

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-27 Thread via GitHub


kirktrue commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609604468

   @ableegoldman you're pretty familiar with this code, IIUC. If so, could you 
take a look at this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13921: MINOR: Reduce (hopefully) flakiness of testRackAwareRangeAssignor

2023-06-27 Thread via GitHub


kirktrue commented on code in PR #13921:
URL: https://github.com/apache/kafka/pull/13921#discussion_r1243822717


##
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##
@@ -249,6 +249,9 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
 reassignments.put(new TopicPartition(topicWithSingleRackPartitions, 
p), util.Optional.of(newAssignment))
   }
   admin.alterPartitionReassignments(reassignments).all().get(30, 
TimeUnit.SECONDS)
+  TestUtils.waitUntilTrue(
+() => admin.listPartitionReassignments().reassignments().get().isEmpty,

Review Comment:
   Do we want to pass in a timeout to `get` in case it hangs?
   
   ```suggestion
   () => admin.listPartitionReassignments().reassignments().get(30, 
TimeUnit.SECONDS).isEmpty,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-27 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1243806647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1072,1338 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+// TODO: this needs to be checked in conjunction with empty group 
offsets.
+//if (groups.containsKey(groupId)) {
+//throw new IllegalStateException("Unexpected unload of active 
group " + groupId +
+//"while loading partition " + topicPartition);
+//}
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return 

[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-27 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1243795683


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,7 +673,7 @@ public synchronized void handleCompletedBatch(ProducerBatch 
batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception);

Review Comment:
   Good catch. That was me being sloppy/assumptive. The `transitionTo` doesn't 
look at the incoming exception unless the target state is `FATAL_ERROR` or 
`ABORTABLE_ERROR`, so this was effectively a no-op anyway 路‍♂️. I've removed 
the exception from the call to `transitionTo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-06-27 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # 

[GitHub] [kafka] clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2023-06-27 Thread via GitHub


clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito 
in streams tests
URL: https://github.com/apache/kafka/pull/12607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah closed pull request #12883: Kip 866 part 1

2023-06-27 Thread via GitHub


mumrah closed pull request #12883: Kip 866 part 1
URL: https://github.com/apache/kafka/pull/12883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on PR #13880:
URL: https://github.com/apache/kafka/pull/13880#issuecomment-1609431175

   @jolshan @jeffkbkim Thanks for your review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13910: KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions

2023-06-27 Thread via GitHub


mumrah commented on code in PR #13910:
URL: https://github.com/apache/kafka/pull/13910#discussion_r1243667664


##
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##
@@ -207,7 +211,7 @@ public void testTriggerLeaderEpochBumpIfNeeded() {
 createFooBuilder()
 .setTargetIsrWithBrokerStates(
 
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 
3, 4)))
-.setBumpLeaderEpochOnIsrShrink(true),
+.enableBumpLeaderEpochOnIsrShrink(true),

Review Comment:
   This case is an ISR expansion, so we don't expect an epoch bump (I added a 
comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243668510


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataValue().setEpoch(10),
+(short) 0
+)
+);
+
+assertArrayEquals(
+MessageUtil.toVersionPrefixedBytes(record.key().version(), 
record.key().message()),
+serializer.serializeKey(record)
+);
+}
+
+@Test
+public void testSerializeValue() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataValue().setEpoch(10),
+(short) 0
+)
+);
+
+assertArrayEquals(
+MessageUtil.toVersionPrefixedBytes(record.value().version(), 
record.value().message()),
+serializer.serializeValue(record)
+);
+}
+
+@Test
+public void testSerializeNullValue() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+null
+);
+
+assertNull(serializer.serializeValue(record));
+}
+
+@Test
+public void testDeserialize() {
+RecordSerde serDe = new RecordSerde();
+
+ApiMessageAndVersion key = new ApiMessageAndVersion(
+new 

[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243666829


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataValue().setEpoch(10),
+(short) 0
+)
+);
+
+assertArrayEquals(
+MessageUtil.toVersionPrefixedBytes(record.key().version(), 
record.key().message()),
+serializer.serializeKey(record)
+);
+}
+
+@Test
+public void testSerializeValue() {

Review Comment:
   see my previous answer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243666154


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1

Review Comment:
   The version of the key does not really matter here but let's use 3. 
Regarding your suggestion to add a test with a different value, I don't see the 
need because the value is ignored anyway here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243663487


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {
+private final short unknownType;
+
+public UnknownRecordTypeException(short unknownType) {
+super(String.format("Found an unknown record type %d", 
unknownType));
+this.unknownType = unknownType;
+}
+
+public short unknownType() {
+return unknownType;

Review Comment:
   It is also used in CoordinatorLoadedImpl.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243662918


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {

Review Comment:
   We had something similar here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1376.
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243661641


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short recordType = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+readMessage(keyMessage, keyBuffer, recordType, "key");
+
+if (valueBuffer == null) {
+return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(recordType);
+final short valueVersion = readVersion(valueBuffer, "value");

Review Comment:
   Right. The record type is taken from the key whereas the version in the 
value is really the version of the message serialized in the value. This is 
weird, I agree...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243660584


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {

Review Comment:
   Yeah, I was thinking about this as well but decided against it in the end. I 
still hope that we could introduce an `apiKey` field in the schemas like we did 
for the "metadata" records. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243657611


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {

Review Comment:
   This does not seem necessary. The package name seems to be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243657016


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.log.UnifiedLog
+import kafka.server.ReplicaManager
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{CompressionType, FileRecords, 
MemoryRecords, SimpleRecord}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, 
LogOffsetMetadata}
+import org.apache.kafka.test.TestUtils.assertFutureThrows
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class StringKeyValueDeserializer extends 
CoordinatorLoader.Deserializer[(String, String)] {
+  override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, 
String) = {
+(
+  Charset.defaultCharset().decode(key).toString,
+  Charset.defaultCharset().decode(value).toString
+)
+  }
+}
+
+@Timeout(60)
+class CoordinatorLoaderImplTest {
+  @Test
+  def testNonexistentPartition(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(None)
+
+  val result = loader.load(tp, coordinator)
+  assertFutureThrows(result, classOf[NotLeaderOrFollowerException])
+}
+  }
+
+  @Test
+  def testLoadingIsRejectedWhenClosed(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  loader.close()
+
+  val result = loader.load(tp, coordinator)
+  assertFutureThrows(result, classOf[RuntimeException])
+}
+  }
+
+  @Test
+  def testLoading(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult1)
+
+  val readResult2 = logReadResult(startOffset = 2, records = Seq(
+new SimpleRecord("k3".getBytes, "v3".getBytes),
+new 

[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243649444


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))
+if (result.isCancelled) {
+  future.completeExceptionally(new RuntimeException("Coordinator loader is 
closed."))
+}
+future
+  }
+
+  private def doLoad(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T],
+future: CompletableFuture[Void]
+  ): Unit = {
+try {
+  replicaManager.getLog(tp) match {
+case None =>
+  future.completeExceptionally(new NotLeaderOrFollowerException(
+s"Could not load records from $tp because the log does not 
exist."))
+
+case Some(log) =>
+  def logEndOffset: Long = 
replicaManager.getLogEndOffset(tp).getOrElse(-1L)
+
+  // buffer may not be needed if records are read from memory
+  var buffer = ByteBuffer.allocate(0)
+  // loop breaks if leader changes at any time during the load, since 
logEndOffset is -1
+  var currOffset = log.logStartOffset
+  // loop breaks if no records have been read, since the end of the 
log has been reached
+  var readAtLeastOneRecord = true
+
+  while (currOffset < logEndOffset && readAtLeastOneRecord && 
isRunning.get) {
+val fetchDataInfo = log.read(
+  startOffset = currOffset,
+  maxLength = loadBufferSize,
+  isolation = FetchIsolation.LOG_END,
+  minOneMessage = true
+)
+
+readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
+val memoryRecords = (fetchDataInfo.records: @unchecked) match {
+  case records: MemoryRecords =>
+records
+
+  case fileRecords: FileRecords =>
+val sizeInBytes = fileRecords.sizeInBytes
+val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
+
+// minOneMessage = true in the above log.read means that the 
buffer may need to
+// be grown to ensure progress can be made.
+if (buffer.capacity < bytesNeeded) {
+  if (loadBufferSize < 

[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243647581


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))
+if (result.isCancelled) {

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-27 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243647232


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](

Review Comment:
   Indeed, it will be created in BrokerServer when the coordinator is created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-27 Thread via GitHub


cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1243468362


##
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##
@@ -189,7 +221,22 @@ public Joined withOtherValueSerde(final 
Serde otherValueSerde) {
  */
 @Override
 public Joined withName(final String name) {
-return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+}
+
+/**
+ * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed. Out of order records in the grace period will 
be processed in timestamp order. Late records, out of the grace period, will be 
executed right as they come in, if it is past the table history retention this 
could result in joins on the wrong version or a null join. Long gaps in stream 
side arriving records will cause records to be delayed in processing, even 
resulting in be processed out of the grace period window.

Review Comment:
   Could you please also add a couple of line breaks to the java docs as you 
did for the javadocs of the `with()` method?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   I think that should be the following way to avoid a unnecessary range query:
   ```suggestion
   if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
   doJoin(record);
   } else {
   buffer.get().evictWhile(() -> true, this::emit);
   }
   ```
   If the record is a late record it will not update the observed stream time. 
If the observed stream time is not updated, the range query will not return 
records that need to be evicted, since they have been already evicted the last 
time `evictWhile()` was called.
   Does this make sense?
   
   If you agree could also please add a test for this?



##
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##
@@ -203,4 +250,4 @@ public Serde valueSerde() {
 public Serde otherValueSerde() {
 return otherValueSerde;
 }
-}
+}

Review Comment:
   nit: Could remove this change? 



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
 }
 }
 
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+builder = new StreamsBuilder();
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table("tableTopic2", consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+inputStreamTopic 

[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1609327501

   Another point I want to call out is that Yammer metrics histogram is 
notorious for consuming CPU (and increase latency). It consumes ~4-5% CPU on 
the network threads for calculating per request latency. Given that we are 
doing this on the latency critical append path (right?), I suspect we may 
encounter some latency increase + CPU increase. Although one histogram 
calculation here should be ok, but it would be nice if you get some 
producer-perf.sh data in as well to ensure that this metric isn't adversely 
impacting latency. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243570850


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   Do we want to add a tag for error code as well? That will help us get per 
error code failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243569174


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   When we bump the version beyond 4 (in future), let's say we have 5 & 6, 
wouldn't we want to ability to look at failure metrics for version 5 only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-27 Thread via GitHub


yashmayya commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1243145305


##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.
+PATCH /connectors/{name}/offsets - alter the 
offsets for a connector. The connector must exist and must be in the stopped 
state. The request body should be a JSON object containing a JSON array 
offsets field, similar to the response body of the GET 
/connectors/{name}/offsets REST API.

Review Comment:
   There is a link to the generated OpenAPI docs at the end of this section so 
I don't think it's necessary to add the same link here as well? Also, I think 
an actual example might be more helpful than the generated OpenAPI spec where 
the finest level of granularity is the 
[ConnectorOffset](https://github.com/apache/kafka/blob/c5889fceddb9a0174452ae60a57c8ff3f087a6a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java#L41)
 schema describing the `partition` and `offset` keys having JSON object values. 
I was hoping that the link to the KIP should be sufficient, but I do see the 
value of including actual examples directly in the docs as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability

2023-06-27 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737576#comment-17737576
 ] 

Kamal Chandraprakash commented on KAFKA-15128:
--

Snappy jar will be used to compress the records from producer. If you're not 
setting the {{compression.type}} as {{snappy}} in your producer configuration, 
then you can safely exclude this jar from your distribution.

https://github.com/apache/kafka/blob/trunk/build.gradle#L1342
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java#L89


> snappy-java-1.1.8.4.jar library vulnerability
> -
>
> Key: KAFKA-15128
> URL: https://issues.apache.org/jira/browse/KAFKA-15128
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: priyatama
>Priority: Major
> Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png
>
>
> Hi Team,
> we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
> need to get rid of it.
> !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!
> during analysis, we found snappy-java coming via kafka-clients.
> As our application is not directly using snappy-java jar.
> Can any one please explain what is use of snappy-java in kafka-client or can 
> we exclude that?
> Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
> will release next version which is having non-vulnerable snappy-java jar in 
> it?
> cc: [Mickael 
> Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-27 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1243425245


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -90,30 +109,34 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
 topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
   }
 }
+verificationFailureRate.mark(topicPartitionsToError.size)
 topicPartitionsToError.toMap
   }
 
+  private def sendCallback(callback: AddPartitionsToTxnManager.AppendCallback, 
errorMap: Map[TopicPartition, Errors], startTime: Long): Unit = {

Review Comment:
   s/startTime/startTimeMs so that it is clear what the units are



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val overrideMetricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest()) {

Review Comment:
   nit
   
   allVerifyOnlyRequest() parenthesis is optional in scala



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   Thank you for the explanation. Makes sense. 
   
   > 1\ use a biased histogram
   
   Could also also please respond to comment around usage of a biased histogram 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -17,25 +17,37 @@
 
 package kafka.server
 
+import 
kafka.server.AddPartitionsToTxnManager.{verificationFailureRateMetricName, 
verificationTimeMsMetricName}
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 object AddPartitionsToTxnManager {
   type AppendCallback = Map[TopicPartition, Errors] => Unit
+
+  val verificationFailureRateMetricName = "VerificationFailureRate"

Review Comment:
   nit
   
   constant variables start from capital letter (pascal case) in Kafka code 
base from what I have observed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-27 Thread via GitHub


flashmouse commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609178487

   @divijvaidya  thank you for reply!
   the unit test 
``org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription``
 could use much more time than configured Timeout(90) and this could reproduce 
easy when ``partitionCount`` and ``consumerCount`` are large enough and not 
equal(ex. ``partitionCount``= 200,  ``consumerCount``= 20), in this situation 
``isBalanced`` and ``performReassignments`` may run whole loop body.
   
   because both ``performReassignments`` and ``isBalanced`` in 
``AbstractStickyAssignor`` are not so efficient(``performReassignments`` is 
worse), ``isBalanced`` could abate ``performReassignments`` call times, so fix 
its logic could speed up this unit test when parameters are small enough.
   
   in my test,  after this fix, 
``testLargeAssignmentAndGroupWithNonEqualSubscription`` could pass when 
``partitionCount``= 200, ``consumerCount`` = 20, but is still slower than the 
combination ``partitionCount``= 2000, ``consumerCount`` = 2000 because in this 
test when the 2 parameters are equal, ``isBalanced`` satisfy the prediction 
``min >= max - 1`` thus could return true immediately.
   
   may be I need add a unit test code with a new commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-27 Thread via GitHub


tombentley commented on code in PR #13862:
URL: https://github.com/apache/kafka/pull/13862#discussion_r1243442987


##
docs/quickstart.html:
##
@@ -154,9 +154,9 @@ 
 By default, each line you enter will result in a separate event 
being written to the topic.
 
 
-$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
-This is my first event
-This is my second event
+$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
+$ This is my first event
+$ This is my second event

Review Comment:
   `kafka-console-producer.sh` has its own prompt using a `>` character and 
without a space.
   
   ```suggestion
   >This is my first event
   >This is my second event
   ```



##
docs/quickstart.html:
##
@@ -32,7 +32,7 @@ 
 the latest Kafka release and extract it:
 
 
-$ tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+$ tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz

Review Comment:
   The `` already has `class="language-bash"`, so I don't think we need 
it on the `` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability

2023-06-27 Thread priyatama (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

priyatama updated KAFKA-15128:
--
Description: 
Hi Team,

we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
need to get rid of it.

!Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!

during analysis, we found snappy-java coming via kafka-clients.

As our application is not directly using snappy-java jar.

Can any one please explain what is use of snappy-java in kafka-client or can we 
exclude that?

Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
will release next version which is having non-vulnerable snappy-java jar in it?

cc: [Mickael 
Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison]

  was:
Hi Team,

we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
need to get rid of it.

!Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!

during analysis, we found snappy-java coming via kafka-clients.

As our application is not directly using snappy-java jar.

Can any one please explain what is use of snappy-java in kafka-client or can we 
exclude that?

Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
will release next version which is having non-vulnerable snappy-java jar in it?


> snappy-java-1.1.8.4.jar library vulnerability
> -
>
> Key: KAFKA-15128
> URL: https://issues.apache.org/jira/browse/KAFKA-15128
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: priyatama
>Priority: Major
> Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png
>
>
> Hi Team,
> we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
> need to get rid of it.
> !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!
> during analysis, we found snappy-java coming via kafka-clients.
> As our application is not directly using snappy-java jar.
> Can any one please explain what is use of snappy-java in kafka-client or can 
> we exclude that?
> Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
> will release next version which is having non-vulnerable snappy-java jar in 
> it?
> cc: [Mickael 
> Maison|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=mimaison]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability

2023-06-27 Thread priyatama (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

priyatama updated KAFKA-15128:
--
Priority: Major  (was: Minor)

> snappy-java-1.1.8.4.jar library vulnerability
> -
>
> Key: KAFKA-15128
> URL: https://issues.apache.org/jira/browse/KAFKA-15128
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: priyatama
>Priority: Major
> Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png
>
>
> Hi Team,
> we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
> need to get rid of it.
> !Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!
> during analysis, we found snappy-java coming via kafka-clients.
> As our application is not directly using snappy-java jar.
> Can any one please explain what is use of snappy-java in kafka-client or can 
> we exclude that?
> Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
> will release next version which is having non-vulnerable snappy-java jar in 
> it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-27 Thread via GitHub


divijvaidya commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1609110377

   Thank you for your first change to Apache Kafka @flashmouse! In the JIRA, 
you mention that this could be reproduced using a unit test. Can you please add 
the unit test here which would fail prior to this change and succeed after it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15128) snappy-java-1.1.8.4.jar library vulnerability

2023-06-27 Thread priyatama (Jira)
priyatama created KAFKA-15128:
-

 Summary: snappy-java-1.1.8.4.jar library vulnerability
 Key: KAFKA-15128
 URL: https://issues.apache.org/jira/browse/KAFKA-15128
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: priyatama
 Attachments: Screenshot 2023-06-27 at 12.30.51 PM.png

Hi Team,

we found new vulnerability introduced in snappy-java-1.1.8.4 library, so we 
need to get rid of it.

!Screenshot 2023-06-27 at 12.30.51 PM.png|width=321,height=230!

during analysis, we found snappy-java coming via kafka-clients.

As our application is not directly using snappy-java jar.

Can any one please explain what is use of snappy-java in kafka-client or can we 
exclude that?

Latest kafka-client also having vulnerable snappy-jar, by when kafka-client 
will release next version which is having non-vulnerable snappy-java jar in it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15127:
-

Assignee: Sagar Rao

> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15106) AbstractStickyAssignor may stuck in 3.5

2023-06-27 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan reassigned KAFKA-15106:


Assignee: li xiangyuan

> AbstractStickyAssignor may stuck in 3.5
> ---
>
> Key: KAFKA-15106
> URL: https://issues.apache.org/jira/browse/KAFKA-15106
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
>
> this could reproduce in ut easy,
> just int 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription,
> plz set 
> partitionCount=200, 
> consumerCount=20,  you can see 
> isBalanced will return false forever.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15127:
-

 Summary: Allow offsets to be reset at the same time a connector is 
deleted.
 Key: KAFKA-15127
 URL: https://issues.apache.org/jira/browse/KAFKA-15127
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao


This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >