[GitHub] [kafka] chia7712 merged pull request #9861: MINOR: Modify unnecessary access specifiers

2021-04-19 Thread GitBox


chia7712 merged pull request #9861:
URL: https://github.com/apache/kafka/pull/9861


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10402: MINOR: Remove unthrown exceptions, fix typo, etc.

2021-04-19 Thread GitBox


dongjinleekr commented on pull request #10402:
URL: https://github.com/apache/kafka/pull/10402#issuecomment-822992332


   @chia7712 This PR seems already approved but not merged yet. Could you have 
a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2021-04-19 Thread GitBox


dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822991690


   > I know @vvcephei was quite swamped in the past months with pretty heavy 
release management duties.
   
   Totally agree. It's also why I did not press him. :smile: I rebased the PR 
into the latest trunk and checked it passes all tests!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616349554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   adopt the refactor 2 from 
https://github.com/apache/kafka/pull/10552#discussion_r615228600.
   
   We used to have an SortedSet of unassignedPartitions, with all partitions 
(ex: 1 million partitions), and loop through current assignment, to remove 
already assigned partitions, ex: 999,000 of them, so we'll only have 1000 
partitions left. However, SortedSet element removing need some time because it 
needs to find element first, and then, do some tree node movement to maintain 
balanced. This situation should happen a lot since each rebalance, we should 
only have small set of changes (ex: 1 consumer dropped), so this is an 
important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
sortedPartitions and sortedToBeRemovedPartitions. And only add the difference 
set of the 2 lists. The looping and element adding is very fast in ArrayList. 
So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon edited a comment on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-822975764


   @ableegoldman (cc. @guozhangwang)
   After completing the sticky general assignor improvement 
(https://github.com/apache/kafka/pull/10552), I started to think... why don't I 
re-use the same techniques to the constrained assignor. In this commit: 
https://github.com/apache/kafka/pull/10509/commits/cd68d10b5030ecf9c1fd40b518322c0649a33ee4,
 I adopted the refactor 2 and 4 in https://github.com/apache/kafka/pull/10552 
to make the constrained assignor faster!
   
   After the PR: the `testLargeAssignmentAndGroupWithUniformSubscription` (1 
million partitions) will run from **~2600 ms
   down to ~1400 ms**, improves **46%** of performance, almost 2x faster!!
   
   Let's what the result is in jenkins trunk build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616350866



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -234,23 +245,28 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 List consumerAssignment = 
assignment.get(consumer);
 
 int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+int currentAssignedCount = consumerAssignment.size();
 if (unassignedPartitionsIter.hasNext()) {
 TopicPartition tp = unassignedPartitionsIter.next();
 consumerAssignment.add(tp);
-unassignedPartitionsIter.remove();

Review comment:
   Don't do the `unassignedPartitions` list element removal as mentioned, 
it's slow. Instead, we make sure the `unfilledMembers` will be empty in the end 
inside the while loop. After all, the `unassignedPartitions` size will always 
>= `unfilledMembers` size. By only removing `unfilledMembers` makes it faster, 
and also keeps the logic correctness.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616350866



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -234,23 +245,28 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 List consumerAssignment = 
assignment.get(consumer);
 
 int expectedAssignedCount = numMaxCapacityMembers < 
numExpectedMaxCapacityMembers ? maxQuota : minQuota;
+int currentAssignedCount = consumerAssignment.size();
 if (unassignedPartitionsIter.hasNext()) {
 TopicPartition tp = unassignedPartitionsIter.next();
 consumerAssignment.add(tp);
-unassignedPartitionsIter.remove();

Review comment:
   Don't do the `unassignedPartitions` list element removal as mentioned, 
it's slow. Instead, we make sure the `unfilledMembers` will be empty in the end 
inside the while loop. After all, the `unassignedPartitions` size will always 
>= `unfilledMembers` size. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616349554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   adopt the refactor 2 from 
https://github.com/apache/kafka/pull/10552#discussion_r615228600.
   
   We used to have an SortedSet of unassignedPartitions, with all partitions 
(ex: 1 million partitions), and loop through current assignment, to remove 
already assigned partitions, ex: 999,000 of them, so we'll only have 1000 
partitions left. However, SortedSet element removing need some time because it 
needs to find element first, and then, do some tree node movement to maintain 
sorted. This situation should happen a lot since each rebalance, we should only 
have small set of changes (ex: 1 consumer dropped), so this is an important 
improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
sortedPartitions and sortedToBeRemovedPartitions. And only add the difference 
set of the 2 lists. The looping and element adding is very fast in ArrayList. 
So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229332



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -527,12 +615,23 @@ private int getBalanceScore(Map> assignment) {
  * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
  * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
  *
- * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param topic2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param partitionsPerTopic The number of partitions for each subscribed 
topic
  * @return  an ascending sorted list of topic partitions based on how many 
consumers can potentially use them
  */
-private List sortPartitions(Map> partition2AllPotentialConsumers) {
-List sortedPartitions = new 
ArrayList<>(partition2AllPotentialConsumers.keySet());
-Collections.sort(sortedPartitions, new 
PartitionComparator(partition2AllPotentialConsumers));
+private List sortPartitions(Map> 
topic2AllPotentialConsumers,
+Map 
partitionsPerTopic) {
+List sortedPartitions = new ArrayList<>();
+List allTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+Collections.sort(allTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+
+// since allTopics are sorted, we can loop through allTopics to create 
the sortedPartitions

Review comment:
   refactor 4: To have `sortPartitions` list, we used to sort all of the 
partitions. To improve it, I sort all topics first(only 500 topics to sort, 
compared to the original 1 million partitions to sort), and then add the 
partitions by looping all sorted topics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616347985



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 if (log.isDebugEnabled()) {
 log.debug("final assignment: " + assignment);
 }
-
+
 return assignment;
 }
 
-private SortedSet getTopicPartitions(Map 
partitionsPerTopic) {
-SortedSet allPartitions =
-new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-for (Entry entry: partitionsPerTopic.entrySet()) {
-String topic = entry.getKey();
-for (int i = 0; i < entry.getValue(); ++i) {
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,
+ List 
sortedToBeRemovedPartitions) {
+List unassignedPartitions = new ArrayList<>(
+sortedPartitions.size() - sortedToBeRemovedPartitions.size());
+
+int index = 0;
+boolean shouldAddDirectly = false;
+int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+for (TopicPartition topicPartition : sortedPartitions) {
+if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+unassignedPartitions.add(topicPartition);
+} else {
+// equal case, don't add to unassignedPartitions, just get 
next partition
+if (index < sizeToBeRemovedPartitions - 1) {
+nextPartition = sortedToBeRemovedPartitions.get(++index);
+} else {
+// add the remaining directly since there is no more 
toBeRemovedPartitions
+shouldAddDirectly = true;
+}
+}
+}
+return unassignedPartitions;
+}
+
+
+private List getTopicPartitions(Map 
partitionsPerTopic) {
+List allPartitions = new ArrayList<>(
+partitionsPerTopic.values().stream().reduce(0, Integer::sum));
+
+List allTopics = new ArrayList<>(partitionsPerTopic.keySet());
+// sort all topics first, then we can have sorted all topic partitions 
by adding partitions starting from 0
+Collections.sort(allTopics);

Review comment:
   adopt the technique of refactor 4 from 
https://github.com/apache/kafka/pull/10552#discussion_r615229332
   We used to maintain a SortedSet of the all topic partitions. It takes some 
time to build the set while adding the partitions. 
   
   Improve it by using ArrayList, and sorting all topics first(only 500 topics 
to sort, compared to the original 1 million partitions to sort), and then add 
the partitions by looping all sorted topics. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229332



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -527,12 +615,23 @@ private int getBalanceScore(Map> assignment) {
  * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
  * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
  *
- * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param topic2AllPotentialConsumers a mapping of partitions to their 
potential consumers
+ * @param partitionsPerTopic The number of partitions for each subscribed 
topic
  * @return  an ascending sorted list of topic partitions based on how many 
consumers can potentially use them
  */
-private List sortPartitions(Map> partition2AllPotentialConsumers) {
-List sortedPartitions = new 
ArrayList<>(partition2AllPotentialConsumers.keySet());
-Collections.sort(sortedPartitions, new 
PartitionComparator(partition2AllPotentialConsumers));
+private List sortPartitions(Map> 
topic2AllPotentialConsumers,
+Map 
partitionsPerTopic) {
+List sortedPartitions = new ArrayList<>();
+List allTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+Collections.sort(allTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+
+// since allTopics are sorted, we can loop through allTopics to create 
the sortedPartitions

Review comment:
   refactor 4: To have `sortPartitions` list, we used to sort all of the 
partitions. To improve it, I sort all topics first(only 500 topics to sort, 
compared to the original 1 million partitions to sort), and then add the 
partitions by looping all sorted topics. (small improvement)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon edited a comment on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-822975764


   @ableegoldman (cc. @guozhangwang)
   After completing the sticky general assignor improvement 
(https://github.com/apache/kafka/pull/10552), I started to think... why don't I 
re-use the same techniques to the constrained assignor. In this commit: 
https://github.com/apache/kafka/pull/10509/commits/cd68d10b5030ecf9c1fd40b518322c0649a33ee4,
 I adopted the refactor 2 and 4 in https://github.com/apache/kafka/pull/10552 
to make the constrained assignor faster!
   
   After the PR: the `testLargeAssignmentAndGroupWithUniformSubscription` (1 
million partitions) will run from **~2600 ms
   down to ~1400 ms**, improves **46%** of performance!!
   
   Let's what the result is in jenkins trunk build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-19 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-822975764


   @ableegoldman (cc. @guozhangwang)
   After completing the sticky general assignor improvement 
(https://github.com/apache/kafka/pull/10552), I started to think why don't I 
re-use the same techniques to the constrained assignor. In this commit: 
https://github.com/apache/kafka/pull/10509/commits/cd68d10b5030ecf9c1fd40b518322c0649a33ee4,
 I adopted the refactor 2 and 4 in https://github.com/apache/kafka/pull/10552 
to make the constrained assignor faster!
   
   After the PR: the `testLargeAssignmentAndGroupWithUniformSubscription` (1 
million partitions) will run from **~2600 ms
   down to ~1400 ms**, improves **46%** of performance!!
   
   Let's what the result is in jenkins trunk build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10551: MINOR: Fix nonsense test line from TopicCommandTest

2021-04-19 Thread GitBox


wenbingshen commented on pull request #10551:
URL: https://github.com/apache/kafka/pull/10551#issuecomment-822973514


   Merge trunk to trigger QA again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9640: KAFKA-10283; Consolidate client-level and consumer-level assignment within ClientState

2021-04-19 Thread GitBox


guozhangwang commented on a change in pull request #9640:
URL: https://github.com/apache/kafka/pull/9640#discussion_r616343033



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -100,40 +94,41 @@ boolean reachedCapacity() {
 }
 
 public Set activeTasks() {
-return unmodifiableSet(activeTasks);
+return unmodifiableSet(assignedActiveTasks.taskIds());
 }
 
 public int activeTaskCount() {
-return activeTasks.size();
+return assignedActiveTasks.taskIds().size();
 }
 
 double activeTaskLoad() {
 return ((double) activeTaskCount()) / capacity;
 }
 
 public void assignActiveTasks(final Collection tasks) {
-activeTasks.addAll(tasks);
+assignedActiveTasks.taskIds().addAll(tasks);
 }
 
 public void assignActiveToConsumer(final TaskId task, final String 
consumer) {
-consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new 
HashSet<>()).add(task);
+assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> 
new HashSet<>()).add(task);

Review comment:
   @highluck I think the main point here is that, inside the 
`assignActiveToConsumer` we first make sure that the `task` is already in the 
`ClientStateTask.taskIds` set, i.e. that the task is already assigned to the 
client before we assign it to the client's consumer in case any bugs caused 
inconsistent assignment: for example, we assigned the task to instance `A` but 
also at the same time assign it to consumer `2` of instance `B`: currently the 
data structure still does not guarantee that would never happen, but at least 
we can add such sanity check for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12693) Consecutive rebalances with zombie instances may cause corrupted changelogs

2021-04-19 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12693:
-

 Summary: Consecutive rebalances with zombie instances may cause 
corrupted changelogs
 Key: KAFKA-12693
 URL: https://issues.apache.org/jira/browse/KAFKA-12693
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


When an instance (or thread within an instance) of Kafka Streams has a soft 
failure and the group coordinator triggers a rebalance, that instance would 
temporarily become a "zombie writer". That is, this instance does not know 
there's already a new rebalance and hence its partitions have been migrated 
out, until it tries to commit and then got notified of the illegal-generation 
error and realize itself is the "zombie" already. During this period until the 
commit, this zombie may still be writing data to the changelogs of the migrated 
tasks as the new owner has already taken over and also writing to the 
changelogs.

When EOS is enabled, this would not be a problem: when the zombie tries to 
commit and got notified that it's fenced, its zombie appends would be aborted. 
With EOS disabled, though, such shared writes would be interleaved on the 
changelogs where a zombie append may arrive later after the new writer's 
append, effectively overwriting that new append.

Note that such interleaving writes do not necessarily cause corrupted data: as 
long as the new producer keep appending after the old zombie stops, and all the 
corrupted keys are overwritten again by the new values, then it is fine. 
However, if there are consecutive rebalances where right after the changelogs 
are corrupted by zombie writers, and before the new writer can overwrite them 
again, the task gets migrated again and needs to be restored from changelogs, 
the old values would be restored instead of the new values, effectively causing 
data loss.

Although this should be a rare event, we should fix it asap still. One idea is 
to have producers get a PID even under ALOS: that is, we set the transactional 
id in the producer config, but did not trigger any txn APIs; when there are 
zombie producers, they would then be immediately fenced on appends and hence 
there's no interleaved appends. I think this may require a KIP still, since 
today one has to call initTxn in order to register and get the PID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] highluck commented on pull request #9861: MINOR: Modify unnecessary access specifiers

2021-04-19 Thread GitBox


highluck commented on pull request #9861:
URL: https://github.com/apache/kafka/pull/9861#issuecomment-822967573


   @chia7712 
   Thank you for the comment Code was modified


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2021-04-19 Thread GitBox


guozhangwang commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822965530


   @dongjinleekr just FYI, I know @vvcephei was quite swamped in the past 
months with pretty heavy release management duties -- you'd see his email about 
2.8.0 release, which was all thanks to him -- so I believe it's just he was too 
busy with 2.8.0 to review your PRs, and I'd like to thank you for your patience!
   
   @vvcephei whenever you have time please let know if the current PR is good 
or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10565: KAFKA-12691: Add case where task can be considered idling

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#discussion_r616324780



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -692,6 +699,7 @@ public boolean process(final long wallClockTime) {
 return false;
 }
 }
+timeCurrentIdlingStarted = Optional.empty();

Review comment:
   Why do we need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-04-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325430#comment-17325430
 ] 

Sagar Rao commented on KAFKA-9168:
--

Sure thanks [~ableegoldman], i have assinged this to myself. Whenever Bruno's 
done with his changes, i can start working on this one..

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-04-19 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-9168:


Assignee: Sagar Rao

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10547: KAFKA-12284: increase request timeout to make tests reliable

2021-04-19 Thread GitBox


showuon commented on pull request #10547:
URL: https://github.com/apache/kafka/pull/10547#issuecomment-822930347


   Thanks @ning2008wisc ! 
   @mimaison , see if you have any comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


chia7712 commented on pull request #10556:
URL: https://github.com/apache/kafka/pull/10556#issuecomment-822928272


   > These test failures seem to have nothing to do with this pr. Apart from 
merging the backbone, can we still trigger the inspection? I do not know what 
to do.
   
   It seems to me the failure is unrelated to this PR. Thanks for your 
contribution. merge to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


chia7712 merged pull request #10556:
URL: https://github.com/apache/kafka/pull/10556


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-19 Thread Haoran Xuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325423#comment-17325423
 ] 

Haoran Xuan commented on KAFKA-10800:
-

[~jagsancio] Thanks so much for the additional information, I will make sure to 
take care of these points you mentioned. 

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9861: MINOR: Modify unnecessary access specifiers

2021-04-19 Thread GitBox


chia7712 commented on a change in pull request #9861:
URL: https://github.com/apache/kafka/pull/9861#discussion_r616298454



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -58,7 +58,7 @@
 }
 
 
-public RawAndDeserializedValue getWithBinary(final K key) {
+RawAndDeserializedValue getWithBinary(final K key) {

Review comment:
   This change may be overkill as this is not an inner class. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -70,9 +70,9 @@
 }
 }
 
-public boolean putIfDifferentValues(final K key,
-final ValueAndTimestamp newValue,
-final byte[] oldSerializedValue) {
+boolean putIfDifferentValues(final K key,

Review comment:
   ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-19 Thread GitBox


vitojeng commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-822913986


   @cadonna , @ableegoldman 
   
   Thanks for great discussion.
   I'll rebase trunk and update the streams upgrade guide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
 balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionCount);
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   refactor 2:
   We used to have an ArrayList of `unassignedPartitions`, with all sorted 
partitions (ex: 1 million partitions), and loop through current assignment, to 
remove already assigned partitions, ex: 999,000 of them, so we'll only have 
1000 partitions left. However, the ArrayList element remove is pretty slow for 
huge size because it needs to find element first, and then, do arrayCopy for 
the removed array with size of (originalSize -1). This situation should happen 
a lot since each rebalance, we should only have small set of changes (ex: 1 
consumer dropped), so this is an important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
`sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the 
difference set of the 2 lists. The looping and element adding is very fast in 
ArrayList. So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
 balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionCount);
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,

Review comment:
   refactor 2:
   We used to have an ArrayList of `unassignedPartitions`, with all sorted 
partitions (ex: 1 million partitions), and loop through current assignment, to 
remove already assigned partitions, ex: 999,000 of them, so we'll only have 
1000 partitions left. However, the ArrayList element remove is pretty slow for 
huge size because it needs to find element first, and then, do arrayCopy for 
the removed array with size of (originalSize -1). This situation should happen 
a lot since each rebalance, we should only have small set of changes (ex: 1 
consumer dropped), so this is an important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
`sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the 
difference set of the 2 lists. The element looping and element added is very 
fast in ArrayList. So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r615229166



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
+int totalPartitionCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
 balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionCount);
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and toBeRemovedPartitions. We use two pointers technique here:
+ *
+ * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
toBeRemovedPartitions
+ *
+ * @param sortedPartitions: sorted all partitions
+ * @param toBeRemovedPartitions: sorted partitions, all are included in 
the sortedPartitions
+ * @return the partitions don't assign to any current consumers
+ */
+private List getUnassignedPartitions(List 
sortedPartitions,
+ List 
toBeRemovedPartitions) {
+List unassignedPartitions = new ArrayList<>();
+
+int index = 0;
+boolean shouldAddDirectly = false;
+int sizeToBeRemovedPartitions = toBeRemovedPartitions.size();
+TopicPartition nextPartition = toBeRemovedPartitions.get(index);
+for (TopicPartition topicPartition : sortedPartitions) {
+if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+unassignedPartitions.add(topicPartition);
+} else {
+// equal case, don't add to unassignedPartitions, just get 
next partition
+if (index < sizeToBeRemovedPartitions - 1) {
+nextPartition = toBeRemovedPartitions.get(++index);
+} else {
+// add the remaining directly since there is no more 
toBeRemovedPartitions
+shouldAddDirectly = true;
+}
+}
+}
+return unassignedPartitions;
+}
+
+/**
+ * update the prevAssignment with the partitions, consumer and generation 
in parameters
+ *
+ * @param partitions: The partitions to be updated the prevAssignement
+ * @param consumer: The consumer Id
+ * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+ * @param generation: The generation of this assignment (partitions)
+ */
+private void updatePrevAssignment(Map prevAssignment,
+  List partitions,
+  String consumer,
+  int generation) {
+for (TopicPartition partition: partitions) {
+ConsumerGenerationPair consumerGeneration = 
prevAssignment.get(partition);
+if (consumerGeneration != null) {
+// only keep the latest previous assignment
+if (generation > consumerGeneration.generation)
+prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+} else {
+prevAssignment.put(partition, new 
ConsumerGenerationPair(consumer, generation));
+}
+}
+}
+
+/**
+ * filling in the currentAssignment and prevAssignment from the 
subscriptions.
+ *
+ * @param subscriptions: Map from the member id to their respective topic 
subscription
+ * @param currentAssignment: The assignment contains the assignments with 
the largest generation
+ * @param prevAssignment: The assignment contains the assignment with the 
2nd largest generation
+ */
 private void prepopulateCurrentAssignments(Map 
subscriptions,
Map> currentAssignment,
Map prevAssignment) {
 // we need to process subscriptions' user data with each consumer's 
reported generation in mind
 // higher generations overwrite lower generations in case of a conflict
 // note 

[GitHub] [kafka] showuon edited a comment on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon edited a comment on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-822200511


   The performance comparison in jenkins for uniform subscription and non-equal 
subscription with the setting:
   ```
   topicCount = 500;
   partitionCount = 2000;
   consumerCount = 2000;
   ```
   
   ```
   Build / JDK 15 and Scala 2.13 / 
testLargeAssignmentAndGroupWithNonEqualSubscription() | 13 sec | Passed
   Build / JDK 11 and Scala 2.13 / 
testLargeAssignmentAndGroupWithNonEqualSubscription() | 17 sec | Passed
   Build / JDK 8 and Scala 2.12 / 
testLargeAssignmentAndGroupWithNonEqualSubscription() | 14 sec | Passed
   
   Build / JDK 8 and Scala 2.12 / 
testLargeAssignmentAndGroupWithUniformSubscription() | 3.4 sec | Passed
   Build / JDK 15 and Scala 2.13 / 
testLargeAssignmentAndGroupWithUniformSubscription() | 3.3 sec | Passed
   Build / JDK 11 and Scala 2.13 / 
testLargeAssignmentAndGroupWithUniformSubscription() | 3.9 sec | Passed
   ```
   I think after this PR, the performance is acceptable for non-equal 
subscription cases. We can have incremental improvement in the following 
stories. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer

2021-04-19 Thread Daniel Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325411#comment-17325411
 ] 

Daniel Huang commented on KAFKA-7878:
-

Also seeing a similar "Task already exists in this worker" error in 2.6.0. Have 
observed it in both sink and source connectors.

> Connect Task already exists in this worker when failed to create consumer
> -
>
> Key: KAFKA-7878
> URL: https://issues.apache.org/jira/browse/KAFKA-7878
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.1, 2.0.1
>Reporter: Loïc Monney
>Priority: Major
>
> *Assumption*
> 1. DNS is not available during a few minutes
> 2. Consumer group rebalances
> 3. Client is not able to resolve DNS entries anymore and fails
> 4. Task seems already registered, so at next rebalance the task will fail due 
> to *Task already exists in this worker* and the only way to recover is to 
> restart the connect process
> *Real log entries*
> * Distributed cluster running one connector on top of Kubernetes
> * Connect 2.0.1
> * kafka-connect-hdfs 5.0.1
> {noformat}
> [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
> bootstrap.servers as DNS resolution failed for kafka.xxx.net 
> (org.apache.kafka.clients.ClientUtils:56)
> [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
> initialization and will not be started. 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:142)
> org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
>  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
>  ... 10 more
> Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
> bootstrap urls given in bootstrap.servers
>  at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
>  ... 13 more
> [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
> [2019-01-28 13:31:25,926] INFO Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
> [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
> (org.apache.kafka.connect.runtime.Worker:555)
> [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
> rebalance 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
> [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
> [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] Successfully joined group with generation 29 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
> [2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
> Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
> leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
> taskIds=[xxx-22]} 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
> [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
> offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
> [2019-01-28 13:31:30,747] INFO Starting task xxx-22 
> 

[GitHub] [kafka] showuon edited a comment on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon edited a comment on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-822292217


   Failed tests are all flaky and unrelated. Thanks.
   The fix to flaky `MirrorConnectorsIntegration` tests issue is in my another 
PR: https://github.com/apache/kafka/pull/10547
   The root cause of flaky `RaftClusterTest` tests will be addressed in 
KAFKA-12677.
   
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testAddingWorker
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ControllerMutationQuotaTest.testStrictDeleteTopicsRequest()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithNotAllowedOverridesForPrincipalPolicy
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   ```


-- 
This is an 

[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-822906608


   @guozhangwang @ableegoldman , PR is ready for review. Thank you. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-04-19 Thread GitBox


showuon edited a comment on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-822292217


   Failed tests are all flaky and unrelated. Thanks.
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testAddingWorker
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ControllerMutationQuotaTest.testStrictDeleteTopicsRequest()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplicationWithEmptyPartition()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   Build / JDK 15 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithNotAllowedOverridesForPrincipalPolicy
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[jira] [Commented] (KAFKA-12207) Do not maintain list of latest producer append information

2021-04-19 Thread Yi Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325405#comment-17325405
 ] 

Yi Ding commented on KAFKA-12207:
-

We can remove the restriction on `max.in.flight.requests.per.connection` and 
document that if the offset is required in `RecordMetadata`, then the user must 
set this to 1

=> will there be performance difference between 1 inflight and 5 inflight?

> Do not maintain list of latest producer append information 
> ---
>
> Key: KAFKA-12207
> URL: https://issues.apache.org/jira/browse/KAFKA-12207
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> For each producerId writing to each partition, we maintain a list of the 5 
> most recent appended sequence numbers and the corresponding offsets in the 
> log. If a producer fails to receive a successful response and retries the 
> Produce request, then we can still return the offset of the successful 
> append, which is returned to the user inside `RecordMetadata`. (Note that the 
> limit of 5 most recent appends is where we derive the limit on the max number 
> of inflight requests that the producer is allowed when idempotence is 
> enabled.)
> This is only a "best-effort" attempt to return the offset of the append. For 
> example, we do not populate the full list of recent appends when the log is 
> reloaded. Only the latest sequence/offset are reloaded from the snapshot. If 
> we receive a duplicate and we do not have the offset, then the broker 
> currently handles this by returning OUT_OF_ORDER_SEQUENCE.
> In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was 
> intended to handle this case and the producer already checks for it. If the 
> producer sees this error in the response, then the `send` is considered 
> successful, but the producer returns -1 as both the offset and timestamp 
> inside `RecordMetadata`.
> The reason we never implemented this on the broker is probably because we 
> allow the sequence numbers of the producer to wrap around after reaching 
> Int.MaxValue. What we considered in the past is fixing a number like 1000 and 
> requiring that the sequence be within that range to be considered a 
> duplicate. A better solution going forward is to let the producer bump the 
> epoch when the sequence hits Int.MaxValue, but we still have to allow 
> sequence numbers to wrap for compatibility.
> Given the loose guarantees that we already have here, I'm considering whether 
> the additional bookkeeping and the required memory are worth preserving. As 
> an alternative, we could consider the following:
> 1. The broker will only maintain the latest sequence/offset for each 
> producerId
> 2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 
> 1000 of the latest sequence (accounting for overflow). 
> 3. Instead of wrapping around sequence numbers, the producer will bump the 
> epoch if possible. It's worth noting that the idempotent producer can freely 
> bump the epoch, so the only time we should ever need to wrap the sequence is 
> for the transactional producer when it is used on a broker which does not 
> support the `InitProducerId` version which allows epoch bumps.
> 4. We can remove the restriction on `max.in.flight.requests.per.connection` 
> and document that if the offset is required in `RecordMetadata`, then the 
> user must set this to 1. Internally, if connecting to an old broker which 
> does not support epoch bumps, then we can restrict the number of inflight 
> requests to 5.
> The benefit in the end is that we can reduce the memory usage for producer 
> state and the complexity to manage that state. It also gives us a path to 
> removing the annoying config restriction and a better policy for sequence 
> overflow.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616269613



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {

Review comment:
   Well, while we should have a check like this, it seems it should go to 
the top of this method, next to the key/value `null` check? We should also add 
a corresponding `lateRecordDropSensor` (cf `KStreamWindowAggregate.java`).
   
   We can also `return` from `process()` early, as if we have a late record, we 
know that stream-time does not advance and thus we don't need to emit anything 
downstream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616268402



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;

Review comment:
   Seems this comment was not address yet. Or do you not want to add this 
additional fix into this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616266276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue {
+private final V1 leftValue;
+private final V2 rightValue;
+
+private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+if (leftValue != null && rightValue != null) {

Review comment:
   We do filter both `null` keys and `null` values. For `null` keys, 
because we cannot really compute the join, and for `null` value because a 
`null` would be a delete in RocksDB and thus we cannot store `null` value to 
begin with.
   
   Thus, it's not possible that both values are `null`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616266276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue {
+private final V1 leftValue;
+private final V2 rightValue;
+
+private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+if (leftValue != null && rightValue != null) {

Review comment:
   We do filter both `null` keys and `null` values. For `null` keys, 
because we cannot really compute the join, and for `null` value because a 
`null` would be a delete in RocksDB and thus we cannot store `null` value to 
begin with.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

2021-04-19 Thread GitBox


satishd commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r616265276



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
   @volatile var config: LogConfig,
+  val segments: LogSegments,

Review comment:
   Sure, that looks reasonable to me. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616264850



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = leftOuter ? 
"-shared-left-outer-join" : "-shared-outer-join";

Review comment:
   That was the intent of my comment, but if you look into the newly added 
tests in `TopologyTest.java` it might not matter too much, as we also have some 
"weird" naming in existing code -- and to stay backward compatible, we cannot 
really change the naming:
   
   ```
   inner-join: (store names)
- KSTREAM-JOINTHIS-04-store
- KSTREAM-JOINOTHER-05-store
   
   left-join: (store names)
- KSTREAM-JOINTHIS-04-store
- KSTREAM-OUTEROTHER-05-store
   
   (Ideally we should have named both KSTREAM-LEFTTHIS-04-store and 
KSTREAM-LEFTOTHER-05-store...)
   
   outer-join: (store names)
- KSTREAM-OUTERTHIS-04-store
- KSTREAM-OUTEROTHER-05-store
```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10559: MINOR: diable RaftClusterTest first

2021-04-19 Thread GitBox


showuon commented on pull request #10559:
URL: https://github.com/apache/kafka/pull/10559#issuecomment-822883918


   @jsancio , good question. I just checked the recent 10 trunk builds, and 
found that there are only 3 tests flaky. So, I updated to only disable 3 tests. 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-19 Thread GitBox


junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r616258845



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1039,77 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+void generateLeaderAndIsrUpdates(String context,
+ boolean excludeCurrentLeaderFromIsr,
+ List records,
+ Iterator iterator) {
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+"isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = partition.isr;
+if (excludeCurrentLeaderFromIsr) {
+newIsr = Replicas.copyWithout(partition.isr, partition.leader);
+}
+int newLeader = bestLeader(partition.replicas, newIsr, false);
+boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);
+if (unclean) {
+// After an unclean leader election, the ISR is reset to just 
the new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.
+newIsr = partition.isr;
+}
+PartitionChangeRecord record = new PartitionChangeRecord();
+if (newLeader != partition.leader) {
+record.setLeader(newLeader);
+}
+if (!Arrays.equals(newIsr, partition.isr)) {
+record.setIsr(Replicas.toList(newIsr));
+}
+if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
+
record.setPartitionId(topicIdPart.partitionId()).setTopicId(topic.id);
+if (unclean) {
+log.info("{}: UNCLEANLY {}", context,
+leaderAndIsrUpdateLogMessage(topicIdPart, partition, 
record));
+} else if (log.isDebugEnabled()) {
+log.debug("{}: {}", context,
+leaderAndIsrUpdateLogMessage(topicIdPart, partition, 
record));
+}
+records.add(new ApiMessageAndVersion(record, (short) 0));
+}
+}
+}
+
+// VisibleForTesting
+String leaderAndIsrUpdateLogMessage(TopicIdPartition topicIdPart,

Review comment:
   Ideally, we want to log this when the record is reflected through replay 
in the controller. That's also when we could log the new leaderEpoch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324358#comment-17324358
 ] 

Travis Bischel edited comment on KAFKA-12675 at 4/19/21, 11:58 PM:
---

Great insight on getting rid of partition2AllPotentialConsumers, as well as 
keeping some more things sorted! I was able to translate that into my own code 
and dropped the large imbalance from 9.5s to 0.5s, as well as from 8.5G memory 
util to 0.5G :)

I'll take a look at the code more in depth soon.

Edit: after further improvements I was able to get the large imbalance in my 
client down to 220ms and 150MB.


was (Author: twmb):
Great insight on getting rid of partition2AllPotentialConsumers, as well as 
keeping some more things sorted! I was able to translate that into my own code 
and dropped the large imbalance from 9.5s to 0.5s, as well as from 8.5G memory 
util to 0.5G :)

I'll take a look at the code more in depth soon.

Edit: after further improvements I was able to get the large imbalance down to 
220ms and 171MB,

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-19 Thread GitBox


guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616218643



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +83,44 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private final Predicate>> 
recordWindowHasClosed =

Review comment:
   nit : I think this function can just be inlined now?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = leftOuter ? 
"-shared-left-outer-join" : "-shared-outer-join";

Review comment:
   I think @mjsax 's comment about naming is just to align with the join 
type, and note that "leftOuter" would always be true once we passed `if 
(leftOuter || rightOuter)` since "leftOuter == false && rightOuter == true" is 
not a case. Also to align with the terms "-outer-this-join" / "-this-join", I 
think it would be:
   
   rightOuter ? "-outer-shared-join" : "-left-shared-join";

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue {
+private final V1 leftValue;
+private final V2 rightValue;
+
+private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+if (leftValue != null && rightValue != null) {

Review comment:
   Hmm.. I thought it could be possible that both sides are null? e.g. for 
a left-value, where the value itself is `null` (we do not filter null values in 
a stream at the moment right? @mjsax ).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the 

[GitHub] [kafka] wcarlson5 commented on pull request #10565: KAFKA-12691: Add case where task can be considered idling

2021-04-19 Thread GitBox


wcarlson5 commented on pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#issuecomment-822850426


   @rodesai @ableegoldman @abbccdda @mjsax
   Turns out there is another case where we should consider a task to be 
idling. If you can give this a review soon that would be great. I would like to 
get this in before Wednesday


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #10565: KAFKA-12691: Add case where task can be considered idling

2021-04-19 Thread GitBox


wcarlson5 opened a new pull request #10565:
URL: https://github.com/apache/kafka/pull/10565


   Currently task is reporting the time it started idling as when the task is 
suspended where it should also take into account enforced non processing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-19 Thread GitBox


junrao commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r616081872



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerdes.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The default implementation of {@link 
org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} stores all
+ * the remote log metadata in an internal topic. Those messages can be {@link 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata},
+ * {@link 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate}, or 
{@link 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata}.
+ * These messages are written in Kafka's protocol message format as mentioned 
in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat;>KIP-405
+ * 
+ * This interface is about serializing and deserializing these messages that 
are stored in remote log metadata internal
+ * topic. There are respective implementations for the mentioned message types.
+ * 
+ * @param  metadata type to be serialized/deserialized.
+ *
+ * @see RemoteLogSegmentMetadataSerdes
+ * @see RemoteLogSegmentMetadataUpdateSerdes
+ * @see RemotePartitionDeleteMetadataSerdes
+ */
+public interface RemoteLogMetadataSerdes {
+
+/**
+ * Returns the message serialized for the given {@code metadata} object.

Review comment:
   We are no longer returning message.

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadataContext}. This is the root serdes
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerdes implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map KEY_TO_SERDES = 
createInternalSerde();
+
+private final Deserializer 

[jira] [Updated] (KAFKA-12692) Enforce config types in ConfigurationControlManager

2021-04-19 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-12692:
--
Priority: Major  (was: Minor)

> Enforce config types in ConfigurationControlManager
> ---
>
> Key: KAFKA-12692
> URL: https://issues.apache.org/jira/browse/KAFKA-12692
> Project: Kafka
>  Issue Type: Task
>  Components: config
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Major
>
> Enforce config types when using the new alterConfigs and 
> incrementalAlterConfigs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response

2021-04-19 Thread Ryan Dielhenn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325345#comment-17325345
 ] 

Ryan Dielhenn commented on KAFKA-12502:
---

Added KAFKA-12692 as a prerequisite.

> Quorum controller should return topic configs in CreateTopic response
> -
>
> Key: KAFKA-12502
> URL: https://issues.apache.org/jira/browse/KAFKA-12502
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
>  Labels: kip-500
>
> Configs were added to the response in version 5. 
> {code}
>   { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": 
> "5+", "nullableVersions": "5+", "ignorable": true,
> "about": "Configuration of the topic.", "fields": [
> { "name": "Name", "type": "string", "versions": "5+",
>   "about": "The configuration name." },
> { "name": "Value", "type": "string", "versions": "5+", 
> "nullableVersions": "5+",
>   "about": "The configuration value." },
> { "name": "ReadOnly", "type": "bool", "versions": "5+",
>   "about": "True if the configuration is read-only." },
> { "name": "ConfigSource", "type": "int8", "versions": "5+", 
> "default": "-1", "ignorable": true,
>   "about": "The configuration source." },
> { "name": "IsSensitive", "type": "bool", "versions": "5+",
>   "about": "True if this configuration is sensitive." }
>   ]}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616208904



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+allAddedPlugins.put(pluginClassName, new ArrayList<>());
 }
 inner.put(plugin, loader);
+allAddedPlugins.get(pluginClassName).add(plugin);

Review comment:
   reused the same pattern for pluginLoader also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12692) Enforce config types in ConfigurationControlManager

2021-04-19 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-12692:
-

 Summary: Enforce config types in ConfigurationControlManager
 Key: KAFKA-12692
 URL: https://issues.apache.org/jira/browse/KAFKA-12692
 Project: Kafka
  Issue Type: Task
  Components: config
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn


Enforce config types when using the new alterConfigs and 
incrementalAlterConfigs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #10564: MINOR: clean up some replication code

2021-04-19 Thread GitBox


cmccabe opened a new pull request #10564:
URL: https://github.com/apache/kafka/pull/10564


   Centralize leader and ISR changes in generateLeaderAndIsrUpdates.
   Consolidate handleNodeDeactivated and handleNodeActivated into this
   function.
   
   Rename BrokersToIsrs#noLeaderIterator to 
BrokersToIsrs#partitionsWithNoLeader.
   Create BrokersToIsrs#partitionsLedByBroker, 
BrokersToIsrs#partitionsWithBrokerInIsr
   
   In ReplicationControlManagerTest, createTestTopic should be a member
   function of ReplicationControlTestContext.  It should invoke
   ReplicationControlTestContext#replay so that records are applied to all
   parts of the test context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12691) TaskMetadata timeSinceIdlingStarted not reporting correctly

2021-04-19 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12691:
--

 Summary: TaskMetadata timeSinceIdlingStarted not reporting 
correctly
 Key: KAFKA-12691
 URL: https://issues.apache.org/jira/browse/KAFKA-12691
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Walker Carlson
Assignee: Walker Carlson


TaskMetadata timeSinceIdlingStarted not reporting correctly. It takes into 
account suspended but not the call to is processable. To fix this we need to 
record when the first time it is not processable. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

2021-04-19 Thread GitBox


C0urante commented on pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#issuecomment-822792962


   @ncliang @gharris1727 do either of you have time to take a look?
   
   @ableegoldman @showuon @guozhangwang FYI, this should be the only change to 
Connect necessary for KIP-726.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12690) Remove deprecated Producer#sendOffsetsToTransaction

2021-04-19 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12690:
---
Description: In 
[KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
 we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in StreamsConfig, 
to be removed in 4.0

> Remove deprecated Producer#sendOffsetsToTransaction
> ---
>
> Key: KAFKA-12690
> URL: https://issues.apache.org/jira/browse/KAFKA-12690
> Project: Kafka
>  Issue Type: Task
>  Components: producer 
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 4.0.0
>
>
> In 
> [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in 
> StreamsConfig, to be removed in 4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12690) Remove deprecated Producer#sendOffsetsToTransaction

2021-04-19 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12690:
--

 Summary: Remove deprecated Producer#sendOffsetsToTransaction
 Key: KAFKA-12690
 URL: https://issues.apache.org/jira/browse/KAFKA-12690
 Project: Kafka
  Issue Type: Task
  Components: producer 
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616169919



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -208,6 +228,22 @@ protected void initLoaders() {
 // Finally add parent/system loader.
 initPluginLoader(CLASSPATH_NAME);
 addAllAliases();
+reportPluginConflicts();
+}
+
+//visible for testing
+Set reportPluginConflicts() {
+Set conflictPluginClasses = new HashSet<>();
+for (Map.Entry>> entry : 
allAddedPlugins.entrySet()) {
+String pluginClassName = entry.getKey();
+List> pluginDescriptors = entry.getValue();
+if (pluginDescriptors.size() > 1) {
+PluginDesc pluginDescInUse = 
pluginDescInUse(pluginClassName);
+log.error("For plugin '{}', detected multiple copies '{}', 
this copy '{}' will be used.", pluginClassName, pluginDescriptors, 
pluginDescInUse);
+conflictPluginClasses.add(pluginClassName);
+}
+}
+return conflictPluginClasses;

Review comment:
   Applied all the changes. @rhauch please check now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12689) Remove deprecated EOS configs

2021-04-19 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12689:
--

 Summary: Remove deprecated EOS configs
 Key: KAFKA-12689
 URL: https://issues.apache.org/jira/browse/KAFKA-12689
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0


In 
[KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
 we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in StreamsConfig, 
to be removed in 4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616119723



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+allAddedPlugins.put(pluginClassName, new ArrayList<>());
 }
 inner.put(plugin, loader);
+allAddedPlugins.get(pluginClassName).add(plugin);

Review comment:
   We can use `computeIfAbsent(...)` to eliminate the prior newly-added 
line:
   ```suggestion
   allAddedPlugins.computeIfAbsent(pluginClassName, n -> new 
ArrayList<>()).add(plugin);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
 if (inner == null) {
 return null;
 }
-ClassLoader pluginLoader = inner.get(inner.lastKey());
+ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
 return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
 }
 
-public ClassLoader connectorLoader(Connector connector) {
+//visible for testing
+PluginDesc pluginDescInUse(String name) {

Review comment:
   I realize that `pluginDesc` in this name is just the equivalent of 
`PluginDesc`, but generally we try to avoid abbreviations, which is probably 
more true in this case because the name seems even more mangled:
   ```suggestion
   PluginDesc usedPluginDesc(String name) {
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
 if (inner == null) {
 return null;
 }
-ClassLoader pluginLoader = inner.get(inner.lastKey());
+ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
 return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
 }
 
-public ClassLoader connectorLoader(Connector connector) {
+//visible for testing
+PluginDesc pluginDescInUse(String name) {
+SortedMap, ClassLoader> inner = pluginLoaders.get(name);
+if (inner == null) {
+return null;
+}
+return pluginDescInUse(inner);
+}
+
+private PluginDesc pluginDescInUse(SortedMap, 
ClassLoader> inner) {
+return inner.lastKey();
+}
+
+public ClassLoader connectorLoader(Connector connector) {

Review comment:
   ```suggestion
   public ClassLoader connectorLoader(Connector connector) {
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -208,6 +228,22 @@ protected void initLoaders() {
 // Finally add parent/system loader.
 initPluginLoader(CLASSPATH_NAME);
 addAllAliases();
+reportPluginConflicts();
+}
+
+//visible for testing
+Set reportPluginConflicts() {
+Set conflictPluginClasses = new HashSet<>();
+for (Map.Entry>> entry : 
allAddedPlugins.entrySet()) {
+String pluginClassName = entry.getKey();
+List> pluginDescriptors = entry.getValue();
+if (pluginDescriptors.size() > 1) {
+PluginDesc pluginDescInUse = 
pluginDescInUse(pluginClassName);
+log.error("For plugin '{}', detected multiple copies '{}', 
this copy '{}' will be used.", pluginClassName, pluginDescriptors, 
pluginDescInUse);
+conflictPluginClasses.add(pluginClassName);
+}
+}
+return conflictPluginClasses;

Review comment:
   This could be rewritten a bit more compactly and a bit more functionally:
   ```suggestion
   return allAddedPlugins.entrySet().stream().filter(e -> 
e.getValue().size() > 1).map(e -> {
   String pluginClassName = e.getKey();
   PluginDesc pluginDescInUse = pluginDescInUse(pluginClassName);
   List> ignoredPlugins = new 
ArrayList<>(e.getValue());
   ignoredPlugins.remove(pluginDescInUse);
   log.error("Detected multiple plugins contain '{}'; using {} and 
ignoring {}", pluginClassName, pluginDescInUse, ignoredPlugins);
   return pluginClassName;
   }).collect(Collectors.toSet());
   ```
   My suggestion also includes a reworded error message to put the more 

[GitHub] [kafka] cmccabe commented on a change in pull request #10561: KAFKA-12686 AlterIsr and LeaderAndIsr race condition

2021-04-19 Thread GitBox


cmccabe commented on a change in pull request #10561:
URL: https://github.com/apache/kafka/pull/10561#discussion_r616115604



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1341,13 +1335,15 @@ class Partition(val topicPartition: TopicPartition,
 isrState = proposedIsrState
 
 if (!alterIsrManager.submit(alterIsrItem)) {
-  // If the ISR manager did not accept our update, we need to revert back 
to previous state
+  // If the ISR manager did not accept our update, we need to revert back 
to previous state.

Review comment:
   "previous state" seems like the wrong description of what we're 
reverting to here.  After all, we're NOT reverting back to the state that 
existed before the ISR was updated by whatever change invalidated our 
AlterIsrRequest.  "current state" seems more accurate.
   
   Also, this comment seems unnecessarily ZK-specific.  Can we mention 
"ChangePartitionRecord or LeaderAndIsrRequest" as candidates for altering the 
ISR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-04-19 Thread GitBox


cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin 
architecture, which means that the ZK-based authorizer shouldn't get any 
special treatment, nor should its setup be done outside its own code.  The code 
and logic for accessing ZooKeeper for ACLs needs to be contained just in the 
AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper 
or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to 
take an Authorizer object as an argument.  (Although it's not even clear to me 
that that is true, since it seems like all authorization happens in KafkaApis 
and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking 
at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its 
`start` function.  So creating the relevant znodes, etc.  We probably need to 
continue doing that setup in KafkaServer.scala as well, since not all users 
will be using KafkaServer + AclAuthorizer (again, plugin architecture, they 
could use using one but not the other.)  
   
   For example, someone might be using ZK mode (KafkaServer.scala) but using a 
non-ZK authorizer such as Apache Ranger, or one of Confluent's authorizers.  In 
that case it's clear that KafkaServer.scala needs to set up whatever znodes it 
needs, and not rely on AclAuthorizer to do that.  It might be helpful to move 
the znode setup code into `KafkaZkClient` so that it doesn't need to be 
duplicated between AclAuthorizer and KafkaServer.scala.  But it's not a lot of 
code in any case, as far as I can see.
   
   A separate issue is that we need to start forwarding the ACL operations to 
the controller.  You did one half of that work here (adding "controller" to the 
message JSONs) but not the other (supporting these calls on the 
controller-side).  If it's easier, we could probably do this in a follow-up 
JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-04-19 Thread GitBox


cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin 
architecture, which means that the ZK-based authorizer shouldn't get any 
special treatment, nor should its setup be done outside its own code.  The code 
and logic for accessing ZooKeeper for ACLs needs to be contained just in the 
AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper 
or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to 
take an Authorizer object as an argument.  (Although it's not even clear to me 
that that is true, since it seems like all authorization happens in KafkaApis 
and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking 
at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its 
`start` function.  So creating the relevant znodes, etc.  We probably need to 
continue doing that setup in KafkaServer.scala as well, since not all users 
will be using KafkaServer + AclAuthorizer (again, plugin architecture, they 
could use using one but not the other.)  It might be helpful to move this into 
`KafkaZkClient` so that it doesn't need to be duplicated.  But it's not a lot 
of code in any case, as far as I can see.
   
   A separate issue is that we need to start forwarding the ACL operations to 
the controller.  You did one half of that work here (adding "controller" to the 
message JSONs) but not the other (supporting these calls on the 
controller-side).  If it's easier, we could probably do this in a follow-up 
JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-04-19 Thread GitBox


cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin 
architecture, which means that the ZK-based authorizer shouldn't get any 
special treatment, nor should its setup be done outside its own code.  The code 
and logic for accessing ZooKeeper for ACLs needs to be contained just in the 
AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper 
or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to 
take an Authorizer object as an argument.  (Although it's not even clear to me 
that that is true, since it seems like all authorization happens in KafkaApis 
and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking 
at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its 
`start` function.  So creating the relevant znodes, etc.  We probably need to 
continue doing that setup in KafkaServer.scala as well, since not all users 
will be using KafkaServer + AclAuthorizer (again, plugin architecture, they 
could use using one but not the other.)
   
   A separate issue is that we need to start forwarding the ACL operations to 
the controller.  You did one half of that work here (adding "controller" to the 
message JSONs) but not the other (supporting these calls on the 
controller-side).  If it's easier, we could probably do this in a follow-up 
JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-04-19 Thread GitBox


cmccabe commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this.
   
   Please keep in mind that this is a plugin architecture.  The ZK-based 
authorizer shouldn't get any special treatment, nor should its setup be done 
outside its own code.  The code and logic for accessing ZooKeeper for ACLs 
needs to be contained just in the AclAuthorizer, not everywhere in the code.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper 
or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to 
take an Authorizer object as an argument.  (Although it's not even clear to me 
that that is true, since it seems like all authorization happens in KafkaApis 
and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking 
at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its 
`start` function.  So creating the relevant znodes, etc.  We probably need to 
continue doing that setup in KafkaServer.scala as well, since not all users 
will be using KafkaServer + AclAuthorizer (again, plugin architecture, they 
could use using one but not the other.)
   
   A separate issue is that we need to start forwarding the ACL operations to 
the controller.  You did one half of that work here (adding "controller" to the 
message JSONs) but not the other (supporting these calls on the 
controller-side).  If it's easier, we could probably do this in a follow-up 
JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12688) Sources and maven artifact in runtime classpath

2021-04-19 Thread Cosmin Giurgiu (Jira)
Cosmin Giurgiu created KAFKA-12688:
--

 Summary: Sources and maven artifact in runtime classpath
 Key: KAFKA-12688
 URL: https://issues.apache.org/jira/browse/KAFKA-12688
 Project: Kafka
  Issue Type: Bug
  Components: build, config
Affects Versions: 2.8.0
Reporter: Cosmin Giurgiu


Started new kafka-2.8.0 with scala 2.13 and while checking the runtime I found 
the 
 * kafka_2.13-2.8.0-sources.jar
 * maven-artifact-3.6.3.jar

in kafka's classpath:
{code:java}
java [...] -cp 
[...]/home/kafka/kafka/bin/../libs/kafka_2.13-2.8.0-sources.jar:[...]:/home/kafka/kafka/bin/../libs/maven-artifact-3.6.3.jar:[...]
 kafka.Kafka /home/kafka/kafka/config/server.properties{code}
Is there a reason for these jars to be there ?

I believe these should not be included in classpath at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616074220



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() 
throws Exception {
 assertNotNull(classLoader.pluginClassLoader(pluginClassName));
 }
 }
+
+@Test
+public void testAddMultiplePluginsWithSameClass() {

Review comment:
   @C0urante I reworked the tests could you please check now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616073878



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
 );
 }
 
-private  void addPlugins(Collection> plugins, ClassLoader 
loader) {
+//visible for testing
+ void addPlugins(Collection> plugins, ClassLoader loader) 
{
 for (PluginDesc plugin : plugins) {
 String pluginClassName = plugin.className();
 SortedMap, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
+boolean pluginConflict = false;
 if (inner == null) {
 inner = new TreeMap<>();
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+} else {
+pluginConflict = true;
 }
 inner.put(plugin, loader);
+if (pluginConflict) {
+log.error("Detected multiple copies of plugin '{}', one of 
these will be used '{}'", pluginClassName, inner.keySet());
+}

Review comment:
   @C0urante you are right, I reworked the PR after your comments, Could 
you please check now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325234#comment-17325234
 ] 

Guozhang Wang commented on KAFKA-12675:
---

Wow this is super! Thanks [~showuon] [~twmb], please ping me as well as 
[~ableegoldman] when you think https://github.com/apache/kafka/pull/10552 is 
ready to review.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante opened a new pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

2021-04-19 Thread GitBox


C0urante opened a new pull request #10563:
URL: https://github.com/apache/kafka/pull/10563


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12487)
   
   Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related 
logic) is hardcoded to assume eager rebalancing, which means that all 
partitions are revoked any time a rebalance occurs and then the set of 
partitions included in `onPartitionsAssigned` is assumed to be the complete 
assignment for the task. Not only does this cause failures when a cooperative 
consumer protocol is used, it fails to take advantage of the benefits provided 
by cooperative protocols.
   
   These changes alter framework logic to not only not break when a cooperative 
consumer protocol is used for a sink connector, but to reap the benefits of it 
as well, by not revoking partitions unnecessarily from tasks just to reopen 
them immediately after the rebalance has completed.
   
   This change will be necessary in order to support 
[KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248),
 which currently proposes that the default consumer partition assignor be 
changed to the `CooperativeStickyAssignor`.
   
   Two integration tests are added to verify sink task behavior with both eager 
and cooperative consumer protocols, and new and existing unit tests are adopted 
as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-19 Thread GitBox


kpatelatwork commented on pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#issuecomment-822663074


   @tombentley could you please review and merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10559: MINOR: diable RaftClusterTest first

2021-04-19 Thread GitBox


jsancio commented on pull request #10559:
URL: https://github.com/apache/kafka/pull/10559#issuecomment-822658214


   @showuon Do we need to disable the entire suite of tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10231) Broken Kafka Connect node to node communication if invalid hostname is in place

2021-04-19 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10231:
--
Description: 
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|#page-13], the error raised by the Kafka Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{  
  at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN class|#L291]].

 

  was:
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|#page-13]], the error raised by the Kafka Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{  
  at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN class|#L291]].

 


> Broken Kafka Connect node to node communication if invalid hostname is in 
> place
> ---
>
> Key: KAFKA-10231
> URL: https://issues.apache.org/jira/browse/KAFKA-10231
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Pere Urbon-Bayes
>Assignee: Kalpesh Patel
>Priority: Minor
>
> As a Kafka Connect operator I would expect a more definitive error when the 
> internal node to node communication can't happen.
> If the hostname contains an invalid character according to the [RFC1123 
> section 2.1|#page-13], the error raised by the Kafka Connect worker node look 
> like:
>  
> {quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
>  Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
> org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
> at 
> org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{   
>  at 
> org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{   
>  at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{   
>  at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
> {quote}
>  
> it would be much nicer for operators that such situations are detected in 
> similar, or 

[jira] [Updated] (KAFKA-10231) Broken Kafka Connect node to node communication if invalid hostname is in place

2021-04-19 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10231:
--
Description: 
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|https://tools.ietf.org/html/rfc1123#page-13], the error raised by the Kafka 
Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{  
  at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN class|#L291]].

 

  was:
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|#page-13], the error raised by the Kafka Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{  
  at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN class|#L291]].

 


> Broken Kafka Connect node to node communication if invalid hostname is in 
> place
> ---
>
> Key: KAFKA-10231
> URL: https://issues.apache.org/jira/browse/KAFKA-10231
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Pere Urbon-Bayes
>Assignee: Kalpesh Patel
>Priority: Minor
>
> As a Kafka Connect operator I would expect a more definitive error when the 
> internal node to node communication can't happen.
> If the hostname contains an invalid character according to the [RFC1123 
> section 2.1|https://tools.ietf.org/html/rfc1123#page-13], the error raised by 
> the Kafka Connect worker node look like:
>  
> {quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
>  Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
> org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
> at 
> org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{   
>  at 
> org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{   
>  at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{   
>  at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
> {quote}
>  
> it would be much nicer for 

[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-04-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325187#comment-17325187
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9168:
---

[~sagarrao] Feel free to pick this up, but you may need to hold off on working 
on it until [~cadonna] has completed KAFKA-8897. I believe he's battling some 
weird runtime issues in RocksDB with the upgrade at the moment, but once we've 
figured that out then this should be unblocked. 

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9821: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries

2021-04-19 Thread GitBox


ableegoldman commented on pull request #9821:
URL: https://github.com/apache/kafka/pull/9821#issuecomment-822625233


   Fine with me. It does seem odd that we'd be inconsistent, but I don't think 
we need to solve everything with this one KIP/PR.  Let's just file a ticket for 
this so we don't forget.
   
   (FWIW I think it was neither an oversight nor an intentional design, at 
least in the case of `queryMetadataForKey`, since this KIP-535 was started and 
completed while this KIP was still in progress, and the author most likely 
wasn't aware that we had plans to improve the IQ exceptions.)
   
   I think in that case, this PR covers everything it needs to. @vitojeng Can 
you just add a quick note about this new exception to the streams upgrade guide 
under the section for API changes in 3.0?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown

2021-04-19 Thread GitBox


vamossagar12 commented on a change in pull request #10468:
URL: https://github.com/apache/kafka/pull/10468#discussion_r616018841



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1673,6 +1673,69 @@ public void testLeaderGracefulShutdownTimeout() throws 
Exception {
 assertFutureThrows(shutdownFuture, TimeoutException.class);
 }
 
+@Test
+public void testLeaderGracefulShutdownOnClose() throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int lingerMs = 50;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withAppendLingerMs(lingerMs)
+.build();
+
+context.becomeLeader();
+assertEquals(OptionalInt.of(localId), context.currentLeader());
+assertEquals(1L, context.log.endOffset().offset);
+
+int epoch = context.currentEpoch();
+assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
+
+context.client.poll();
+assertEquals(OptionalLong.of(lingerMs), 
context.messageQueue.lastPollTimeoutMs());
+
+context.time.sleep(20);
+
+// client closed now.
+context.client.close();
+
+// Flag for accepting appends should be toggled to false.
+assertFalse(context.client.canAcceptAppends());
+
+// acceptAppends flag set to false so no writes should be accepted by 
the Leader now.
+assertNull(context.client.scheduleAppend(epoch, singletonList("b")));
+
+// The leader should trigger a flush for whatever batches are present 
in the BatchAccumulator
+assertEquals(2L, context.log.endOffset().offset);
+
+// Now shutdown
+
+// We should still be running until we have had a chance to send 
EndQuorumEpoch
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+
+// Send EndQuorumEpoch request to the other voter
+context.pollUntilRequest();
+assertTrue(context.client.isShuttingDown());
+assertTrue(context.client.isRunning());
+context.assertSentEndQuorumEpochRequest(1, otherNodeId);
+
+// We should still be able to handle vote requests during graceful 
shutdown
+// in order to help the new leader get elected
+context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, 
epoch, 1L));
+context.client.poll();
+context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+

Review comment:
   no problem @jsancio  plz review whenever you get the chance :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-04-19 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r616018340



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";
 public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
   hi @jsancio / @hachikuji  could you plz review the PR whenever you get 
the chance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10278: KAFKA-10526: leader fsync deferral on write

2021-04-19 Thread GitBox


vamossagar12 commented on pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#issuecomment-822622340


   hey @jsancio / @hachikuji  could you plz review the PR whenever you get the 
chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #9779: KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache

2021-04-19 Thread GitBox


vamossagar12 commented on pull request #9779:
URL: https://github.com/apache/kafka/pull/9779#issuecomment-822621642


   hey @guozhangwang , would you be able to review this whenever you get the 
chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-04-19 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-12313:
--
Comment: was deleted

(was: https://github.com/apache/kafka/pull/10468#pullrequestreview-628083086)

> Consider deprecating the default.windowed.serde.inner.class configs
> ---
>
> Key: KAFKA-12313
> URL: https://issues.apache.org/jira/browse/KAFKA-12313
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> During the discussion of KIP-659 we discussed whether it made sense to have a 
> "default" class for the serdes of windowed inner classes across Streams. 
> Using these configs instead of specifying an actual Serde object can lead to 
> subtle bugs, since the WindowedDeserializer requires a windowSize in addition 
> to the inner class. If the default constructor is invoked, as it will be when 
> falling back on the config, this windowSize defaults to MAX_VALUE. 
> If the downstream program doesn't care about the window end time in the 
> output, then this can go unnoticed and technically there is no problem. But 
> if anything does depend on the end time, or the user just wants to manually 
> read the output for testing purposes, then the MAX_VALUE will result in a 
> garbage timestamp.
> We should consider whether the convenience of specifying a config instead of 
> instantiating a Serde in each operator is really worth the risk of a user 
> accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-04-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325182#comment-17325182
 ] 

Sagar Rao commented on KAFKA-9168:
--

hey [~guozhang], the benchmarks that I had talked about were for the prefix 
scan changes that i was working on back then. BTW, I can take this up if that's 
ok with you?

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325181#comment-17325181
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12675:


Nice! It would be great if we could get these improvements in to 3.0, since as 
Luke mentioned we plan to make this the default assignor.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message

2021-04-19 Thread GitBox


rhauch commented on a change in pull request #9541:
URL: https://github.com/apache/kafka/pull/9541#discussion_r615975188



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
##
@@ -266,7 +267,7 @@ public static void validateValue(String name, Schema 
schema, Object value) {
 private static List expectedClassesFor(Schema schema) {
 List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
 if (expectedClasses == null)
-expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), 
Collections.emptyList());

Review comment:
   Strictly speaking, this shouldn't be necessary as `SCHEMA_TYPE_CLASSES` 
should have a `Schema` instance for all `Schema.Type` literals. And with 
`SchemaBuilder` a connector or converter cannot create a schema instance with a 
null `Schema.Type`. 
   
   However, it is possible to construct a `ConnectSchema` instance with a null 
`Type` reference (like what `FakeSchema` essentially does in the existing 
test), which of course without this change would result in this method 
returning a null list. 
   
   So +1 for this line change since it simplifies the error handling in the 
calling code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #10562: MINOR: Update tests to include the 2.8.0 release

2021-04-19 Thread GitBox


vvcephei opened a new pull request #10562:
URL: https://github.com/apache/kafka/pull/10562


   Add the 2.8 release to our tests.
   
   Before this will work, someone with access will have to upload these 
artifacts: ( https://home.apache.org/~vvcephei/2.8.0-mirror/ ) to this bucket: 
s3://kafka-packages .
   
   Like this:
   ```
   aws s3 cp kafka_2.12-2.8.0.tgz s3://kafka-packages
   ```
   etc.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen closed pull request #10327: MINOR: No fetcher for partitions don't need to print remove fetcher log

2021-04-19 Thread GitBox


wenbingshen closed pull request #10327:
URL: https://github.com/apache/kafka/pull/10327


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


wenbingshen commented on pull request #10556:
URL: https://github.com/apache/kafka/pull/10556#issuecomment-822586978


   > > Dear @chia7712 According to your instructions, I have reverted all 
imports. In your free time, please help me to review again. Thanks. :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


wenbingshen commented on pull request #10556:
URL: https://github.com/apache/kafka/pull/10556#issuecomment-822587200


   @chia7712  These test failures seem to have nothing to do with this pr. 
Apart from merging the backbone, can we still trigger the inspection? I do not 
know what to do. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen closed pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


wenbingshen closed pull request #10556:
URL: https://github.com/apache/kafka/pull/10556


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand

2021-04-19 Thread GitBox


wenbingshen commented on pull request #10556:
URL: https://github.com/apache/kafka/pull/10556#issuecomment-822583575


   > Dear @chia7712 According to your instructions, I have reverted all 
imports. In your free time, please help me to review again. Thanks. :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on a change in pull request #9861: MINOR: Modify unnecessary access specifiers

2021-04-19 Thread GitBox


highluck commented on a change in pull request #9861:
URL: https://github.com/apache/kafka/pull/9861#discussion_r615971825



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -96,7 +96,7 @@ public boolean putIfDifferentValues(final K key,
 public static class RawAndDeserializedValue {

Review comment:
   @chia7712 The code has been modified!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand

2021-04-19 Thread GitBox


wenbingshen commented on a change in pull request #10558:
URL: https://github.com/apache/kafka/pull/10558#discussion_r615967262



##
File path: core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
##
@@ -273,6 +273,48 @@ final class LeaderElectionCommandTest extends 
ZooKeeperTestHarness {
 ))
 assertTrue(e.getCause.isInstanceOf[TimeoutException])
   }
+
+  @Test
+  def testElectionResultOutput(): Unit = {
+TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
+  val topic = "non-preferred-topic"
+  val partition0 = 0
+  val partition1 = 1
+  val assignment0 = Seq(broker2, broker3)
+  val assignment1 = Seq(broker3, broker2)
+
+  TestUtils.createTopic(zkClient, topic, Map(partition0 -> assignment0, 
partition1 -> assignment1), servers)
+
+  val topicPartition0 = new TopicPartition(topic, partition0)
+  val topicPartition1 = new TopicPartition(topic, partition1)
+
+  TestUtils.assertLeader(client, topicPartition0, broker2)
+  TestUtils.assertLeader(client, topicPartition1, broker3)
+
+  servers(broker2).shutdown()
+  TestUtils.assertLeader(client, topicPartition0, broker3)
+  servers(broker2).startup()
+  TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
+  TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
+
+  val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, 
topicPartition1))
+  val output = TestUtils.grabConsoleOutput(
+LeaderElectionCommand.main(
+  Array(
+"--bootstrap-server", bootstrapServers(servers),
+"--election-type", "preferred",
+"--path-to-json-file", topicPartitionPath.toString
+  )
+)
+  )
+
+  val electionResultOutputIter = output.split("\n").iterator
+  assertTrue(electionResultOutputIter.hasNext)
+  assertTrue(electionResultOutputIter.next().contains(s"Successfully 
completed leader election (PREFERRED) for partitions $topicPartition0"))
+  assertTrue(electionResultOutputIter.hasNext)
+  assertTrue(electionResultOutputIter.next().contains(s"Valid replica 
already elected for partitions $topicPartition1"))

Review comment:
   Dear @dajac Following your suggestion, I added a unit test to verify the 
output of noop set and succeeded set. Are you satisfied with this? :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-19 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325140#comment-17325140
 ] 

Jose Armando Garcia Sancio commented on KAFKA-10800:


Yes. Please take it. Can you also look into the following when the snapshot 
gets created using {{SnaphsotWriter}}:
1. Set the {{baseOffset}} to the snapshotId's {{endOffset - 1}} for every 
record batch.
2. Set the relative offset for every record to 0.
3. Se the epoch of every batch to the snapshotId's epoch.

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-19 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-10800:
--

Assignee: Haoran Xuan  (was: Jose Armando Garcia Sancio)

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-19 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-10800:
--

Assignee: Jose Armando Garcia Sancio

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615310932



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
 );
 }
 
-private  void addPlugins(Collection> plugins, ClassLoader 
loader) {
+//visible for testing
+ void addPlugins(Collection> plugins, ClassLoader loader) 
{
 for (PluginDesc plugin : plugins) {
 String pluginClassName = plugin.className();
 SortedMap, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
+boolean pluginConflict = false;
 if (inner == null) {
 inner = new TreeMap<>();
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+} else {
+pluginConflict = true;
 }
 inner.put(plugin, loader);
+if (pluginConflict) {
+log.error("Detected multiple copies of plugin '{}', one of 
these will be used '{}'", pluginClassName, inner.keySet());
+}

Review comment:
   I had discussed that case with Randall and you are right detecting and 
logging only once would need the change to inspect the map after all plugins 
are loaded which complicates the code. This is why it's ok to see the same 
statement twice as it would be rare for someone to have 3 copies of the same 
plugin.
   
   sure let me make the change to log which one will be used as of the time 
when the statement is logged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615928396



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
 );
 }
 
-private  void addPlugins(Collection> plugins, ClassLoader 
loader) {
+//visible for testing
+ void addPlugins(Collection> plugins, ClassLoader loader) 
{
 for (PluginDesc plugin : plugins) {
 String pluginClassName = plugin.className();
 SortedMap, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
+boolean pluginConflict = false;
 if (inner == null) {
 inner = new TreeMap<>();
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+} else {
+pluginConflict = true;
 }
 inner.put(plugin, loader);
+if (pluginConflict) {
+log.error("Detected multiple copies of plugin '{}', one of 
these will be used '{}'", pluginClassName, inner.keySet());
+}

Review comment:
   Mmmm, I'm not sure we should be making decisions here based on dynamic 
plugin loading for two reasons:
   
   1. This change can be backported to older versions of Connect, which will 
never have that feature.
   2. It's unclear exactly what the mechanism for dynamic plugin loading will 
be, and it's possible that a re-scan of all known plugins after loading has 
taken place (either the initial start load or a subsequent dynamic load at 
runtime) could still be beneficial
   
   Also, it's actually not that uncommon for 3+ copies of the same plugin to 
appear on the plugin path for a worker. For example, some connectors come 
packaged directly with converters; all it takes is at least two such connectors 
and a separately-installed copy of that converter to lead to that number of 
copies, without any error or misconfiguration on the part of the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #10561: KAFKA-12686 AlterIsr and LeaderAndIsr race condition

2021-04-19 Thread GitBox


mumrah opened a new pull request #10561:
URL: https://github.com/apache/kafka/pull/10561


   Copied from the JIRA:
   
   > In Partition.scala, there is a race condition between the handling of an 
AlterIsrResponse and a LeaderAndIsrRequest. This is a pretty rare scenario and 
would involve the AlterIsrResponse being delayed for some time, but it is 
possible. This was observed in a test environment when lots of ISR and 
leadership changes were happening due to broker restarts.
   > 
   > When the leader handles the LeaderAndIsr, it calls Partition#makeLeader 
which overrides the isrState variable and clears the pending ISR items via 
AlterIsrManager#clearPending(TopicPartition).
   > 
   > The bug is that AlterIsrManager does not check its inflight state before 
clearing pending items. The way AlterIsrManager is designed, it retains 
inflight items in the pending items collection until the response is processed 
(to allow for retries). The result is that an inflight item is inadvertently 
removed from this collection.
   > 
   > Since the inflight item is cleared from the collection, AlterIsrManager 
allows for new AlterIsrItem-s to be enqueued for this partition even though it 
has an inflight AlterIsrItem. By allowing an update to be enqueued, Partition 
will transition its isrState to one of the inflight states (PendingIsrExpand, 
PendingIsrShrink, etc). Once the inflight partition's response is handled, it 
will fail to update the isrState due to detecting changes since the request was 
sent (which is by design). However, after the response callback is run, 
AlterIsrManager will clear the partitions that it saw in the response from the 
unsent items collection. This includes the newly added (and unsent) update.
   > 
   > The result is that Partition has a "inflight" isrState but AlterIsrManager 
does not have an unsent item for this partition. This prevents any further ISR 
updates on the partition until the next leader election (when isrState is 
reset).
   > 
   > If this bug is encountered, the workaround is to force a leader election 
which will reset the partition's state.
   
   
   This PR removes the clearPending call from AlterIsrManager. As seen with 
this bug, this method is not safe to call any time there is an AlterIsrRequest 
in-flight. We could add more protections around this call, but it is simpler 
(and safer) to just remove it. Clearing unsent ISR updates is not really 
necessary after a leader election since the updates will fail due to a stale 
leader epoch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

2021-04-19 Thread GitBox


C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615928396



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
 );
 }
 
-private  void addPlugins(Collection> plugins, ClassLoader 
loader) {
+//visible for testing
+ void addPlugins(Collection> plugins, ClassLoader loader) 
{
 for (PluginDesc plugin : plugins) {
 String pluginClassName = plugin.className();
 SortedMap, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
+boolean pluginConflict = false;
 if (inner == null) {
 inner = new TreeMap<>();
 pluginLoaders.put(pluginClassName, inner);
 // TODO: once versioning is enabled this line should be moved 
outside this if branch
 log.info("Added plugin '{}'", pluginClassName);
+} else {
+pluginConflict = true;
 }
 inner.put(plugin, loader);
+if (pluginConflict) {
+log.error("Detected multiple copies of plugin '{}', one of 
these will be used '{}'", pluginClassName, inner.keySet());
+}

Review comment:
   Mmmm, I'm not sure we should be making decisions here based on dynamic 
plugin loading for two reasons:
   
   1. This change can be backported to older versions of Connect, which will 
never have that feature.
   2. It's unclear exactly what the mechanism for dynamic plugin loading will 
be, and it's possible that a re-scan of all known plugins after loading has 
taken place (either the initial start load or a subsequent dynamic load at 
runtime) could still be beneficial
   
   Also, it's actually not that uncommon for 3+ copies of the same plugin to 
appear on the plugin path for a worker. For example, some connectors come 
packaged directly with converters; all it takes is at least two such connectors 
and a separately-installed copy of that converter to lead to that number of 
copies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >