[GitHub] [kafka] chia7712 opened a new pull request #9516: MINOR: make Send and Receive work with TransportLayer rather than Gat…

2020-10-27 Thread GitBox


chia7712 opened a new pull request #9516:
URL: https://github.com/apache/kafka/pull/9516


   ```BlockingChannel``` was removed by 
cc4dce94af8b19a796eeb7a9be78640739cb1a48 so it is time to refactor ```Send``` 
and ```NetworkReceive``` to remove ScatteringByteChannel and 
GatheringByteChannel.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JunHe77 commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-10-27 Thread GitBox


JunHe77 commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-717710128


   Really appreciated for the feedback, @lizthegrey . Glad to know there are 
actual use cases to run and test Kafka on Arm64, and it seems that merging this 
will help users/developers from community perpective.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

2020-10-27 Thread GitBox


ableegoldman commented on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-717691656


   > Although, as I'm typing this, I'm realizing the bug is probably that when 
the task is initialized, we'd report some high offsetSum, which the assignor 
interprets as a low lag, and when it's not initialized, then we report nothing, 
which the assignor interprets as a high lag (since the assignor will 
independently verify if it's a stateless task or not).
   
   Exactly. It shouldn't report different offset sums in two adjacent 
rebalances if nothing changed except that it was assigned a task. One way to 
look at it is if we had just waited slightly longer to rejoin the group, then 
the task would have been initialized with the checkpoint offsets anyway, so the 
checkpoint file is the source of truth while the task is still in CREATED.
   
   If the checkpoint file is empty (and the task uninitialized), then we 
_should_ report no offsets for that task because it doesn't have any actual 
state. Skipping the offset sum is technically how we handle stateless tasks, 
but it's also what happens for any stateful task we just don't happen to find 
on disk. Which is exactly what the task would be if the checkpoint is empty



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

2020-10-27 Thread GitBox


vvcephei commented on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-717683830


   Thanks for this quick fix, @ableegoldman !
   
   It looks like the bug before was that we would skip to encode these 
uninitialized tasks since they'd look "apparently stateless", and now we'll 
just try to read the checkpoint instead. If the checkpoint file is empty, 
though, it seems like the outcome is the same, though, right? We would not 
encode anything, just like a stateless task?
   
   Although, as I'm typing this, I'm realizing the bug is probably that when 
the task _is_ initialized, we'd report some high offsetSum, which the assignor 
interprets as a low lag, and when it's _not_ initialized, then we report 
nothing, which the assignor interprets as a high lag (since the assignor will 
independently verify if it's a stateless task or not). In that case, when we 
legitimately have no checkpoint file, then it's ok to report nothing, because 
we legitimately have a high lag. And it won't flip-flop in any case, because 
once the task gets initialized, its lag will still be about the same.
   
   Did I get that right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10526) Explore performance impact of leader fsync deferral

2020-10-27 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221881#comment-17221881
 ] 

Jason Gustafson edited comment on KAFKA-10526 at 10/28/20, 1:23 AM:


[~sagarrao] Yes, of course. I might suggest KAFKA-10652 or KAFKA-10634 as lower 
hanging fruits to get into the code a little bit.


was (Author: hachikuji):
[~sagarrao] Yes, of course. I might suggest KAFKA-10652 as a lower hanging 
fruit to get into the code a little bit.

> Explore performance impact of leader fsync deferral
> ---
>
> Key: KAFKA-10526
> URL: https://issues.apache.org/jira/browse/KAFKA-10526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> In order to commit a write, a majority of nodes must call fsync in order to 
> ensure the data has been written to disk. An interesting optimization option 
> to consider is letting the leader defer fsync until the high watermark is 
> ready to be advanced. This potentially allows us to reduce the number of 
> flushes on the leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10526) Explore performance impact of leader fsync deferral

2020-10-27 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221881#comment-17221881
 ] 

Jason Gustafson commented on KAFKA-10526:
-

[~sagarrao] Yes, of course. I might suggest KAFKA-10652 as a lower hanging 
fruit to get into the code a little bit.

> Explore performance impact of leader fsync deferral
> ---
>
> Key: KAFKA-10526
> URL: https://issues.apache.org/jira/browse/KAFKA-10526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> In order to commit a write, a majority of nodes must call fsync in order to 
> ensure the data has been written to disk. An interesting optimization option 
> to consider is letting the leader defer fsync until the high watermark is 
> ready to be advanced. This potentially allows us to reduce the number of 
> flushes on the leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached

2020-10-27 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10652:
---

 Summary: Raft leader should flush accumulated writes after a min 
size is reached
 Key: KAFKA-10652
 URL: https://issues.apache.org/jira/browse/KAFKA-10652
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson


In KAFKA-10601, we implemented linger semantics similar to the producer to let 
the leader accumulate a batch of writes before fsyncing them to disk. Currently 
the fsync is only based on the linger time, but it would be helpful to make it 
size-based as well. In other words, if we accumulate a configurable N bytes, 
then we should not wait for linger expiration and should just fsync immediately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

2020-10-27 Thread GitBox


ableegoldman commented on a change in pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#discussion_r513114972



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2479,7 +2502,7 @@ public void shouldTransmitProducerMetrics() {
 allTasks.put(task.id(), (StateMachineTask) task);
 }
 for (final Task task : restoringTasks) {
-assertThat(task.state(), not(Task.State.RUNNING));

Review comment:
   Nothing really changed here, but we use to just leave tasks in CREATED 
and call them "restoring" so I had to fix this up so they really were RESTORING





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9515: KAFKA-10561: read offsets directly from checkpoint for uninitialized tasks

2020-10-27 Thread GitBox


ableegoldman opened a new pull request #9515:
URL: https://github.com/apache/kafka/pull/9515


   Uninitialized tasks just return an empty collection in `changelogOffsets()` 
and are indistinguishable from genuinely stateless (or un-logged) tasks. We 
should just skip over these tasks and read directly from the checkpoint file 
when computing offset sums for a JoinGroup subscription



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10651) Assignor reports offsets from uninitialized task

2020-10-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10651:
---
Description: 
In KIP-441, the new HA assignor makes an informed decision about stateful task 
placement based on the offset sums reported by each instance. Offset sums are 
computed one of two ways: for assigned tasks (ie those in the TaskManager's 
"tasks" map), it will just sum up the tasks' changelog offsets directly. For 
tasks that are not assigned but whose directory remains on disk, it reads the 
changelog offsets from the checkpoint file. This is encoded with the 
subscription userdata sent during the JoinGroup phase of a rebalance.

The problem here is that it's possible for the instance to rejoin the group 
after having been assigned a new task, but before that task is initialized. In 
this case it would not compute the offset sum from the checkpoint file but 
instead from the uninitialized task, causing it to skip reporting any offsets 
for that task whatsoever.

This results in a particularly nefarious interaction between HA and cooperative 
rebalancing. An instance may read from the checkpoint file of a caught-up (but 
unassigned) task and report this in its subscription, leading the assignor to 
compute a small lag and place this task on the instance. After placing all 
stateful tasks in this way, it will distribute the stateless tasks across the 
group to balance the overall workload. It does this without considering the 
previous owner of the stateless tasks, so odds are good that moving the 
stateful task to this instance will result in a different assortment of 
stateless tasks in this rebalance.

Any time owned tasks are moved around, the current owner will have to revoke 
them and trigger a followup cooperative rebalance. Within the Consumer client, 
this actually happens immediately: that is, within an invocation of poll() it 
will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the 
end of the last rebalance, if any partitions are revoked then a rejoin will 
indeed be needed. So the Consumer will send out it's next JoinGroup – including 
the userdata with computed task offset sums – without first exiting from the 
current poll(). Streams never gets the chance to initialize its new tasks, and 
ends up excluding them from the offset sums it reports in the following 
rebalance.

And since it doesn't report any offsets for this task, the assignor now 
believes the instance does _not_ have any caught up state for this task, and 
assigns the task elsewhere. This causes a shuffling of stateless tasks once 
more, which in turn results in another cooperative rebalance. This time the 
task is no longer assigned so the instance reports offsets based on the 
checkpoint file again, and we're back at the beginning.

Given the deterministic assignment, once a group is caught up in this cycle it 
will be impossible to escape it without manual intervention (ie deleting some 
or all of the task directories and forcing it to restore from scratch)

  was:
In KIP-441, the new HA assignor makes an informed decision about stateful task 
placement based on the offset sums reported by each instance. Offset sums are 
computed one of two ways: for assigned tasks (ie those in the TaskManager's 
"tasks" map), it will just sum up the tasks' changelog offsets directly. For 
tasks that are not assigned but whose directory remains on disk, it reads the 
changelog offsets from the checkpoint file. This is encoded with the 
subscription userdata sent during the JoinGroup phase of a rebalance.

The problem here is that it's possible for the instance to rejoin the group 
after having been assigned a new task, but before that task is initialized. In 
this case it would not compute the offset sum from the checkpoint file but 
instead from the uninitialized task, causing it to skip reporting any offsets 
for that task whatsoever.

This results in a particularly nefarious interaction between HA and cooperative 
rebalancing. An instance may read from the checkpoint file of a caught-up (but 
unassigned) task and report this in its subscription, leading the assignor to 
compute a small lag and place this task on the instance. After placing all 
stateful tasks in this way, it will distribute the stateless tasks across the 
group to balance the overall workload. It does this without considering the 
previous owner of the stateless tasks, so odds are good that moving the 
stateful task to this instance will result in a different assortment of 
stateless tasks in this rebalance.

Any time owned tasks are moved around, the current owner will have to revoke 
them and trigger a followup cooperative rebalance. Within the Consumer client, 
this actually happens immediately: that is, within an invocation of poll() it 
will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the 
e

[GitHub] [kafka] rodesai commented on a change in pull request #9498: MINOR: call super.close() when closing RocksDB options

2020-10-27 Thread GitBox


rodesai commented on a change in pull request #9498:
URL: https://github.com/apache/kafka/pull/9498#discussion_r513100855



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1396,5 +1396,6 @@ public void close() {
 // ColumnFamilyOptions should be closed last
 dbOptions.close();
 columnFamilyOptions.close();
+super.close();

Review comment:
   updated the comment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #9498: MINOR: call super.close() when closing RocksDB options

2020-10-27 Thread GitBox


rodesai commented on a change in pull request #9498:
URL: https://github.com/apache/kafka/pull/9498#discussion_r513095994



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1396,5 +1396,6 @@ public void close() {
 // ColumnFamilyOptions should be closed last
 dbOptions.close();
 columnFamilyOptions.close();
+super.close();

Review comment:
   I don't think it matters in this case as the parent's Options handle 
isn't actually used. Still, I've maintained the inverse of the initialization 
order here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10644) Fix VotedToUnattached test error

2020-10-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10644.
-
Resolution: Fixed

> Fix VotedToUnattached test error
> 
>
> Key: KAFKA-10644
> URL: https://issues.apache.org/jira/browse/KAFKA-10644
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> codes of `QuorumStateTest.testVotedToUnattachedHigherEpoch`  is not in 
> consistent with its name, the method name is VotedToUnattached, but the code 
> is UnattachedToUnattached:
> ```
> state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
> state.transitionToUnattached(5);
> long remainingElectionTimeMs = 
> state.unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
> time.sleep(1000);
> state.transitionToUnattached(6);
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9503: KAFKA-10644: Fix VotedToUnattached test error

2020-10-27 Thread GitBox


hachikuji merged pull request #9503:
URL: https://github.com/apache/kafka/pull/9503


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9498: fix: call super.close when closing rocksdb options

2020-10-27 Thread GitBox


mjsax commented on a change in pull request #9498:
URL: https://github.com/apache/kafka/pull/9498#discussion_r513093711



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1396,5 +1396,6 @@ public void close() {
 // ColumnFamilyOptions should be closed last
 dbOptions.close();
 columnFamilyOptions.close();
+super.close();

Review comment:
   Are there any ordering constraints to call the three `close` methods?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9514: MINOR: Add KIP-431 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck merged pull request #9514:
URL: https://github.com/apache/kafka/pull/9514


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9514: MINOR: Add KIP-431 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9514:
URL: https://github.com/apache/kafka/pull/9514#issuecomment-717602293


   only HTML, merging now



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9514: MINOR: Add KIP-431 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9514:
URL: https://github.com/apache/kafka/pull/9514#issuecomment-717602172


   Thanks @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9514: MINOR: Add KIP-431 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck opened a new pull request #9514:
URL: https://github.com/apache/kafka/pull/9514


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10651) Assignor reports offsets from uninitialized task

2020-10-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-10651:
--

Assignee: A. Sophie Blee-Goldman

> Assignor reports offsets from uninitialized task
> 
>
> Key: KAFKA-10651
> URL: https://issues.apache.org/jira/browse/KAFKA-10651
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> In KIP-441, the new HA assignor makes an informed decision about stateful 
> task placement based on the offset sums reported by each instance. Offset 
> sums are computed one of two ways: for assigned tasks (ie those in the 
> TaskManager's "tasks" map), it will just sum up the tasks' changelog offsets 
> directly. For tasks that are not assigned but whose directory remains on 
> disk, it reads the changelog offsets from the checkpoint file. This is 
> encoded with the subscription userdata sent during the JoinGroup phase of a 
> rebalance.
> The problem here is that it's possible for the instance to rejoin the group 
> after having been assigned a new task, but before that task is initialized. 
> In this case it would not compute the offset sum from the checkpoint file but 
> instead from the uninitialized task, causing it to skip reporting any offsets 
> for that task whatsoever.
> This results in a particularly nefarious interaction between HA and 
> cooperative rebalancing. An instance may read from the checkpoint file of a 
> caught-up (but unassigned) task and report this in its subscription, leading 
> the assignor to compute a small lag and place this task on the instance. 
> After placing all stateful tasks in this way, it will distribute the 
> stateless tasks across the group to balance the overall workload. It does 
> this without considering the previous owner of the stateless tasks, so odds 
> are good that moving the stateful task to this instance will result in a 
> different assortment of stateless tasks in this rebalance.
> Any time owned tasks are moved around, the current owner will have to revoke 
> them and trigger a followup cooperative rebalance. Within the Consumer 
> client, this actually happens immediately: that is, within an invocation of 
> poll() it will loop inside joinGroupIfNeeded() as long as a rejoin is needed. 
> And at the end of the last rebalance, if any partitions are revoked then a 
> rejoin will indeed be needed. So the Consumer will send out it's next 
> JoinGroup – including the userdata with computed task offset sums – without 
> first exiting from the current poll(). Streams never gets the chance to 
> initialize its new tasks, and ends up excluding them from the offset sums it 
> reports in the following rebalance.
> And since it doesn't report any offsets for this task, the assignor now 
> believes the instance does _not_ have any caught up state for this task, and 
> assigns the task elsewhere. This causes a shuffling of stateless tasks once 
> more, which in turn results in another cooperative rebalance. This time the 
> task is no longer assigned so the instance reports offsets based on the 
> checkpoint file again, and we're back at the beginning.
> Given the deterministic assignment, once a group is caught up in this cycle 
> it will be impossible to escape it without manual intervention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10651) Assignor reports offsets from uninitialized task

2020-10-27 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-10651:
--

 Summary: Assignor reports offsets from uninitialized task
 Key: KAFKA-10651
 URL: https://issues.apache.org/jira/browse/KAFKA-10651
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: A. Sophie Blee-Goldman
 Fix For: 2.7.0, 2.6.1


In KIP-441, the new HA assignor makes an informed decision about stateful task 
placement based on the offset sums reported by each instance. Offset sums are 
computed one of two ways: for assigned tasks (ie those in the TaskManager's 
"tasks" map), it will just sum up the tasks' changelog offsets directly. For 
tasks that are not assigned but whose directory remains on disk, it reads the 
changelog offsets from the checkpoint file. This is encoded with the 
subscription userdata sent during the JoinGroup phase of a rebalance.

The problem here is that it's possible for the instance to rejoin the group 
after having been assigned a new task, but before that task is initialized. In 
this case it would not compute the offset sum from the checkpoint file but 
instead from the uninitialized task, causing it to skip reporting any offsets 
for that task whatsoever.

This results in a particularly nefarious interaction between HA and cooperative 
rebalancing. An instance may read from the checkpoint file of a caught-up (but 
unassigned) task and report this in its subscription, leading the assignor to 
compute a small lag and place this task on the instance. After placing all 
stateful tasks in this way, it will distribute the stateless tasks across the 
group to balance the overall workload. It does this without considering the 
previous owner of the stateless tasks, so odds are good that moving the 
stateful task to this instance will result in a different assortment of 
stateless tasks in this rebalance.

Any time owned tasks are moved around, the current owner will have to revoke 
them and trigger a followup cooperative rebalance. Within the Consumer client, 
this actually happens immediately: that is, within an invocation of poll() it 
will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the 
end of the last rebalance, if any partitions are revoked then a rejoin will 
indeed be needed. So the Consumer will send out it's next JoinGroup – including 
the userdata with computed task offset sums – without first exiting from the 
current poll(). Streams never gets the chance to initialize its new tasks, and 
ends up excluding them from the offset sums it reports in the following 
rebalance.

And since it doesn't report any offsets for this task, the assignor now 
believes the instance does _not_ have any caught up state for this task, and 
assigns the task elsewhere. This causes a shuffling of stateless tasks once 
more, which in turn results in another cooperative rebalance. This time the 
task is no longer assigned so the instance reports offsets based on the 
checkpoint file again, and we're back at the beginning.

Given the deterministic assignment, once a group is caught up in this cycle it 
will be impossible to escape it without manual intervention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221813#comment-17221813
 ] 

Guozhang Wang commented on KAFKA-8803:
--

[~pdeole] I was OOO and just saw your message. So far all issues that are found 
related to this observed symptom is on the broker side, but in 2.5.0 they 
should be all fixed. If you see other issues they may be related to other PRs 
not listed here. I think upgrading to 2.5.1 is worthy but I cannot tell if it 
would definitely fix your issue.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-10-27 Thread GitBox


rajinisivaram commented on pull request #7751:
URL: https://github.com/apache/kafka/pull/7751#issuecomment-717541120


   @parafiend Will you be updating the PR to address the review comments from 
@junrao ? As mentioned in the JIRA, this issue also affects the ZooKeeper 
client used by authorizers, so it will be good to add a test for authorizer as 
well. Please let me know if I can be of any help.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2020-10-27 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221756#comment-17221756
 ] 

Rajini Sivaram commented on KAFKA-7987:
---

[~junrao]Thank you! I will follow up on the PR.

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Critical
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #9513: MINOR: improve `null` checks for headers

2020-10-27 Thread GitBox


mjsax opened a new pull request #9513:
URL: https://github.com/apache/kafka/pull/9513


   Headers are not allowed to be `null`, and thus the code does not check for 
`null` headers. However, we don't have proper guards in place and had NPE in 
the past if a header was `null`. This PR adds additional `null` checks to avoid 
that users create corrupted headers.
   
   Cf https://issues.apache.org/jira/browse/KAFKA-8142 and 
https://issues.apache.org/jira/browse/KAFKA-10645
   
   Call for review @mimaison



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10370:

Fix Version/s: (was: 2.7.0)

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use *consumer.assign* with *consumer.seek* , instead of 
> 

[jira] [Updated] (KAFKA-10378) issue when create producer using java

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10378:

Fix Version/s: (was: 2.6.1)
   (was: 2.7.0)

> issue when create producer using java
> -
>
> Key: KAFKA-10378
> URL: https://issues.apache.org/jira/browse/KAFKA-10378
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: mac os
> java version "1.8.0_231"
> intellij 
>Reporter: Mohammad Abdelqader
>Assignee: Luke Chen
>Priority: Blocker
>
> I created simple producer using java by Intellij studio. When i run project , 
> it return following issue
> [kafka-producer-network-thread | producer-1] ERROR 
> org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | 
> producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught 
> exception in thread 'kafka-producer-network-thread | 
> producer-1':java.lang.NoClassDefFoundError: 
> com/fasterxml/jackson/databind/JsonNode at 
> org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36)
>  at 
> org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:382) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418) at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9497) Brokers start up even if SASL provider is not loaded and throw NPE when clients connect

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9497:
---
Fix Version/s: (was: 2.7.0)

> Brokers start up even if SASL provider is not loaded and throw NPE when 
> clients connect
> ---
>
> Key: KAFKA-9497
> URL: https://issues.apache.org/jira/browse/KAFKA-9497
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.1.1, 2.4.0
>Reporter: Rajini Sivaram
>Assignee: Ron Dagostino
>Priority: Major
>
> Note: This is not a regression, this has been the behaviour since SASL was 
> first implemented in Kafka.
>  
> Sasl.createSaslServer and Sasl.createSaslClient may return null if a SASL 
> provider that works for the specified configs cannot be created. We don't 
> currently handle this case. As a result broker/client throws 
> NullPointerException if a provider has not been loaded. On the broker-side, 
> we allow brokers to start up successfully even if SASL provider for its 
> enabled mechanisms are not found. For SASL mechanisms 
> PLAIN/SCRAM-xx/OAUTHBEARER, the login module in Kafka loads the SASL 
> providers. If the login module is incorrectly configured, brokers startup and 
> then fail client connections when hitting NPE. Clients see disconnections 
> during authentication as a result. It is difficult to tell from the client or 
> broker logs why the failure occurred. We should fail during startup if SASL 
> providers are not found and provide better diagnostics for this case.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10297) Don't use deprecated producer config `retries`

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10297:

Fix Version/s: (was: 2.7.0)

> Don't use deprecated producer config `retries`
> --
>
> Key: KAFKA-10297
> URL: https://issues.apache.org/jira/browse/KAFKA-10297
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>
> In 2.7.0 release, producer config `retries` gets deprecated via KIP-572.
> Connect is still using this config what needs to be fixed (cf 
> [https://github.com/apache/kafka/pull/8864/files#r439685920])
> {quote}Btw: @hachikuji raise a concern about this issue, too: 
> https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531
> > I just had one question about the proposal. Using retries=0 in the producer 
> > allows the user to have "at-most-once" delivery. This allows the 
> > application to implement its own retry logic for example. Do we still have 
> > a way to do this once this configuration is gone?
> So maybe we need to do some follow up work in the `Producer` to make it work 
> for Connect. But I would defer this to the follow up work.
> My original though was, that setting `max.deliver.timeout.ms := request 
> .timeout.ms` might prevent internal retries. But we need to verify this. It 
> was also brought to my attention, that this might not work if the network 
> disconnects -- only `retries=0` would prevent to re-open the connection but a 
> low timeout would not prevent retries.
> In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = 
> 0` as "no retries" -- maybe we can do a similar thing for the producer?
> There is also `max.block.ms` that we should consider. Unfortunately, I am not 
> an expert on the `Producer`. But I would like to move forward with KIP-572 
> for now and are happy to help to resolve those questions.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10645) Forwarding a record from a punctuator sometimes it results in a NullPointerException

2020-10-27 Thread Filippo Machi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221709#comment-17221709
 ] 

Filippo Machi commented on KAFKA-10645:
---

My input data doesn´t have headers.

I am getting this exception inside a Punctuator, my class is implementing the 
interface *org.apache.kafka.streams.processor.Punctuator*, inside the method 

 
{code:java}
   public void punctuate(long timestamp){code}

the logic is retrieving data from a local store and then is calling



 
{code:java}
context.forward(key, value) {code}

The record with the header is built in the library, I am not setting them 
explicitly. I think the serialization logic is adding some headers but I am 
trying to add logs to have more information.

 

> Forwarding a record from a punctuator sometimes it results in a 
> NullPointerException
> 
>
> Key: KAFKA-10645
> URL: https://issues.apache.org/jira/browse/KAFKA-10645
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Filippo Machi
>Priority: Major
>
> Hello,
>  I am working on a java kafka stream application (v. 2.5.0) running on a 
> kubernetes cluster.
> It´s a springboot application running with java 8.
> With the last upgrade to version 2.5.0 I started to see into the logs some 
> NullPointerException that are happening when forwarding a record from a 
> punctuator. 
>  This is the stacktrace of the exception
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_2] Abort 
> sending since an error caught with a previous record (timestamp 
> 1603721062667) to topic reply-reminder-push-sender due to 
> java.lang.NullPointerException\tat 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:240)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)\tat
>  
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)\t...
>  24 common frames omittedCaused by: java.lang.NullPointerException: null\tat 
> org.apache.kafka.common.record.DefaultRecord.sizeOf(DefaultRecord.java:613)\tat
>  
> org.apache.kafka.common.record.DefaultRecord.recordSizeUpperBound(DefaultRecord.java:633)\tat
>  
> org.apache.kafka.common.record.DefaultRecordBatch.estimateBatchSizeUpperBound(DefaultRecordBatch.java:534)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:135)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:125)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:914)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)\t...
>  29 common frames omitted
> {code}
> Checking the code, it looks like it happens calculating the size of the 
> record. There is one header that is null but I don´t think I can control 
> those headers right?
> Thanks a lot



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-27 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r512370043



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1388,6 +1390,7 @@ class Log(@volatile private var _dir: File,
 var validBytesCount = 0
 var firstOffset: Option[Long] = None
 var lastOffset = -1L
+var lastLeaderEpoch: Option[Int] = None

Review comment:
   nit: not sure how much it matters, but maybe we can avoid the extra 
garbage and just use an integer until we're ready to build the result?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+  currentState
+} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 
0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+  (currentState == null || currentState.state == Fetching)) {
+  PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
   I am wondering in what situation we would find `currentState` non-null 
here. The current logic in `ReplicaManager.makeFollowers` always calls 
`removeFetcherForPartitions` before adding the partition back. The reason I ask 
is that I wasn't sure we should be taking the last fetched epoch from the 
initial state or if we should keep the current one. It seems like the latter 
might be more current?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String,
 
   val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
   handlePartitionsWithErrors(partitionsWithError, 
"truncateToEpochEndOffsets")
-  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
isTruncationOnFetchSupported)
+}
+  }
+
+  private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+inLock(partitionMapLock) {
+  val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+  handlePartitionsWithErrors(partitionsWithError, 
"truncateOnFetchResponse")
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
maySkipTruncation = false)

Review comment:
   It's not clear to me why we set `maySkipTruncation` to false here. If 
the truncation is not complete, wouldn't that put us in the `Truncating` state?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -454,15 +492,23 @@ abstract class AbstractFetcherThread(name: String,
 * truncation completed if their offsetTruncationState indicates truncation 
completed
 *
 * @param fetchOffsets the partitions to update fetch offset and maybe mark 
truncation complete
+* @param maySkipTruncation true if we can stay in Fetching mode and 
perform truncation later based on
+   *   diverging epochs from fetch responses.
 */
-  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState]): Unit = {
+  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState],
+  
maySkipTruncation: Boolean): Unit = {
 val newStates: Map[TopicPartition, PartitionFetchState] = 
partitionStates.partitionStateMap.asScala
   .map { case (topicPartition, currentFetchState) =>
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
-val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+val (state, lastFetchedEpoch) = if 
(offsetTruncationState.truncationCompleted)
+  (Fetching, latestEpoch(topicPartition))
+else if 

[jira] [Commented] (KAFKA-10645) Forwarding a record from a punctuator sometimes it results in a NullPointerException

2020-10-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221678#comment-17221678
 ] 

Matthias J. Sax commented on KAFKA-10645:
-

Well, I would have assumed that your input data would have headers?

IIRC, Kafka Streams uses headers for some internal topic if you use a FK-join? 
Are you using a FK join?

> Forwarding a record from a punctuator sometimes it results in a 
> NullPointerException
> 
>
> Key: KAFKA-10645
> URL: https://issues.apache.org/jira/browse/KAFKA-10645
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Filippo Machi
>Priority: Major
>
> Hello,
>  I am working on a java kafka stream application (v. 2.5.0) running on a 
> kubernetes cluster.
> It´s a springboot application running with java 8.
> With the last upgrade to version 2.5.0 I started to see into the logs some 
> NullPointerException that are happening when forwarding a record from a 
> punctuator. 
>  This is the stacktrace of the exception
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_2] Abort 
> sending since an error caught with a previous record (timestamp 
> 1603721062667) to topic reply-reminder-push-sender due to 
> java.lang.NullPointerException\tat 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:240)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)\tat
>  
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)\t...
>  24 common frames omittedCaused by: java.lang.NullPointerException: null\tat 
> org.apache.kafka.common.record.DefaultRecord.sizeOf(DefaultRecord.java:613)\tat
>  
> org.apache.kafka.common.record.DefaultRecord.recordSizeUpperBound(DefaultRecord.java:633)\tat
>  
> org.apache.kafka.common.record.DefaultRecordBatch.estimateBatchSizeUpperBound(DefaultRecordBatch.java:534)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:135)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:125)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:914)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)\t...
>  29 common frames omitted
> {code}
> Checking the code, it looks like it happens calculating the size of the 
> record. There is one header that is null but I don´t think I can control 
> those headers right?
> Thanks a lot



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10601) Add linger semantics to raft

2020-10-27 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10601.
-
Resolution: Fixed

> Add linger semantics to raft
> 
>
> Key: KAFKA-10601
> URL: https://issues.apache.org/jira/browse/KAFKA-10601
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In order to tune latency/throughput tradeoffs when writing to the metadata 
> log, it is useful to support a linger configuration. This allows the cost of 
> fsync to be amortized at the expense of latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-27 Thread GitBox


hachikuji merged pull request #9418:
URL: https://github.com/apache/kafka/pull/9418


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10648) Add Prefix Scan support to State Stores

2020-10-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10648:

Component/s: streams

> Add Prefix Scan support to State Stores
> ---
>
> Key: KAFKA-10648
> URL: https://issues.apache.org/jira/browse/KAFKA-10648
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
>
> This issue is related to the changes mentioned in:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  
> which seeks to add prefix scan support to State stores. Currently, only 
> RocksDB and InMemory key value stores are being supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10648) Add Prefix Scan support to State Stores

2020-10-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10648:

Labels: kip  (was: )

> Add Prefix Scan support to State Stores
> ---
>
> Key: KAFKA-10648
> URL: https://issues.apache.org/jira/browse/KAFKA-10648
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
>
> This issue is related to the changes mentioned in:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  
> which seeks to add prefix scan support to State stores. Currently, only 
> RocksDB and InMemory key value stores are being supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-27 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512951535



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+maxConnectionRate match {
+  case Some(rate) =>
+info(s"Updating max connection rate override for $address to 
$rate")
+connectionRatePerIp.put(address, rate)
+  case None =>
+info(s"Removing max connection rate override for $address")
+connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+info(s"Updating default max IP connection rate to $newQuota")
+defaultConnectionRatePerIp = newQuota
+val allMetrics = metrics.metrics
+allMetrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName) && 
shouldUpdateQuota(metric, newQuota)) {
+info(s"Updating existing connection rate sensor for 
${metricName.tags} to $newQuota")
+metric.config(rateQuotaMetricConfig(newQuota))
+  }

Review comment:
   @splett2 Maybe I was not looking at the right place in this PR, but does 
this PR handles the case if someone sets non-unlimited per IP default quota? 
Basically, /config/ips/ znode. 
   
   Because if default is set and it is not unlimited, removing quota for an IP 
should fall back to configured IP default, and if IP default is not set, then 
fall back to `DynamicConfig.Ip.DefaultConnectionCreationRate`. Which I think 
means that we need to have "" ip in the `connectionRatePerIp` if 
default is not unlimited, and use it when creating per-IP sensor with the right 
quota.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


ramesh-muthusamy commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r512945677



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -577,15 +596,14 @@ private void resetDelay() {
 numToRevoke = floorTasks;
 for (WorkerLoad existing : existingWorkers) {
 Iterator tasks = existing.tasks().iterator();
+numToRevoke = existing.tasksSize() - ceilTasks;

Review comment:
   @kkonstantine  we are trying to revoke the additional tasks assigned [if 
any] to a worker so that in the next stage the revoked tasks can be re assigned 
to the new workers that get added to the group.  Yes we need to apply this to 
connectors as well, I will update the PR for the changes .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy edited a comment on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


ramesh-muthusamy edited a comment on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-717450100


   @kkonstantine  this is a good scenario, let me test and see if 
   it is already captured. Thanks for test case. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


ramesh-muthusamy commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r512938623



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 if (scheduledRebalance > 0 && now >= scheduledRebalance) {
 // delayed rebalance expired and it's time to assign resources
 log.debug("Delayed rebalance expired. Reassigning lost tasks");
-Optional candidateWorkerLoad = Optional.empty();
+List candidateWorkerLoad = Collections.emptyList();
 if (!candidateWorkersForReassignment.isEmpty()) {
 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
 }
 
-if (candidateWorkerLoad.isPresent()) {
-WorkerLoad workerLoad = candidateWorkerLoad.get();
-log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-lostAssignments.connectors().forEach(workerLoad::assign);
-lostAssignments.tasks().forEach(workerLoad::assign);
+if (!candidateWorkerLoad.isEmpty()) {
+log.debug("Assigning lost tasks to {} candidate workers: {}", 
+candidateWorkerLoad.size(),
+
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+Iterator candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+for (String connector : lostAssignments.connectors()) {
+// Loop over the the candidate workers as many times as it 
takes

Review comment:
   It not only balances the tasks for new workers it also does revocation 
if there is any overloaded worker. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9511:
URL: https://github.com/apache/kafka/pull/9511#issuecomment-717454236


   cherry-picked to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


ramesh-muthusamy commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r512938005



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -577,15 +595,14 @@ private void resetDelay() {
 numToRevoke = floorTasks;

Review comment:
   yes , I will remove the same





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


ramesh-muthusamy commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-717450100


   @kkonstantine  this is a good scenario, let me test and see it is already 
captured. Thanks for test case. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9511:
URL: https://github.com/apache/kafka/pull/9511#issuecomment-717449278


   Merged #9511 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck merged pull request #9511:
URL: https://github.com/apache/kafka/pull/9511


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9511:
URL: https://github.com/apache/kafka/pull/9511#issuecomment-717447946


   only HTML so merging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-27 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512928815



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+  val startThrottleTimeMs = time.milliseconds
+
+  waitForConnectionSlot(listenerName, startThrottleTimeMs, 
acceptorBlockedPercentMeter)
+
+  val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
   There are some corner cases here where `startThrottleTimeMs` could be in 
the past if `waitForConnectionSlot()` waited for an active connection slot to 
become available (if we exceeded the limit for the number of active 
connections), or waited for broker-wide or listener-wide rate to get back to 
quota. In some cases, ipThrottleTimeMs would be zero if we checked against the 
current time here.
   
   Since getting System.currentTimeMillis() is not that expensive (as it used 
to be), I think it would be better to revert to `waitForConnectionSlot` getting 
its own time (as before this PR), and then `recordIpConnectionMaybeThrottle` 
getting current time and also calling the code block below and throwing 
ConnectionThrottledException. And then adding a comment here that 
`recordIpConnectionMaybeThrottle` would throw an exception if per-IP quota is 
exceeded. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-10-27 Thread GitBox


kkonstantine commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-717433838


   After writing another test, I confirmed my impression after first reading 
the code. So, this fix would help if all the lost workers joined after leaving 
the group at about the same time. But if a completely new worker joins in 
addition to a returning worker, then the fact that we distribute the tasks 
between the new workers, will lead to an imbalance compared to any existing 
workers. For example, 
   
   With 2 workers with each having 8 tasks each, if 1 worker leaves and rejoins 
within the scheduled delay but also another new worker joins, then we'll end up 
with the worker that never left having 8 tasks and the other two workers having 
4 tasks each. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2020-10-27 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221624#comment-17221624
 ] 

Jun Rao commented on KAFKA-7987:


[~rsivaram], yes, I think this issue still exists on all versions of Kafka. The 
PR for this jira hasn't been merged yet. The authorizer uses a separate 
instance of ZooKeeperClient from the broker. So, they could fail authorization 
independently.

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Critical
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lizthegrey commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-10-27 Thread GitBox


lizthegrey commented on pull request #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-717421301


   Honeycomb doesn't directly have an interest in this as we use Confluent's 
packages, but we do run Confluent's Kafka distro on ARM64 (successfully).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-27 Thread GitBox


guozhangwang commented on pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#issuecomment-717420938


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10638:

Priority: Major  (was: Blocker)

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.8.0, 2.7.1
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
>   at sun.reflect.GeneratedMethod

[jira] [Comment Edited] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-27 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221610#comment-17221610
 ] 

Bill Bejeck edited comment on KAFKA-10638 at 10/27/20, 5:48 PM:


Hey [~vvcephei] I appreciate you wanting to get a fix in for the test, but I'm 
not sure it meets the criteria for a blocker.

Going back over the previous [2.7 
builds|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/activity]
 [32-41], it's only failed once in [build 
40|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/detail/kafka-2.7-jdk8/40/tests/]

For the 2.7.0 release process, I'm going to reduce the severity and move the 
fix version to 2.7.1 and 2.8.0


was (Author: bbejeck):
Hey [~vvcephei] I appreciate you wanting to get a fix in for the test, but I'm 
not sure it meets the criteria for a blocker.

Going back over the previous [2.7 
builds|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/activity],
 it's only failed once in [build 
40|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/detail/kafka-2.7-jdk8/40/tests/]

For the 2.7.0 release process, I'm going to reduce the severity and move the 
fix version to 2.7.1 and 2.8.0

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.grad

[jira] [Updated] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10638:

Fix Version/s: (was: 2.7.0)
   2.7.1
   2.8.0

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWor

[jira] [Commented] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-27 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221610#comment-17221610
 ] 

Bill Bejeck commented on KAFKA-10638:
-

Hey [~vvcephei] I appreciate you wanting to get a fix in for the test, but I'm 
not sure it meets the criteria for a blocker.

Going back over the previous [2.7 
builds|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/activity],
 it's only failed once in [build 
40|https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/detail/kafka-2.7-jdk8/40/tests/]

For the 2.7.0 release process, I'm going to reduce the severity and move the 
fix version to 2.7.1 and 2.8.0

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gra

[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-27 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512888360



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -526,10 +527,14 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
 if (channel != null) {
   debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress()}")
   connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
-  CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
-  CoreUtils.swallow(channel.close(), this, Level.ERROR)
+  closeSocket(channel)
 }
   }
+
+  protected def closeSocket(channel: SocketChannel): Unit = {

Review comment:
   this method is also called from the derived class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-27 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
 close(endPoint.listenerName, socketChannel)
 None
+  case e: ConnectionThrottledException =>
+val ip = socketChannel.socket.getInetAddress
+debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
   I agree, that it is better to optimize for checking connections vs. 
overhead of adding/removing to the queue, because `closeThrottledConnections` 
runs pretty often (on a loop), which I think also means that finding that there 
are no connections yet to unthrottle would also be common. Or very few 
connections to unthrottle. So, after reading all your evaluations above, I am 
also leaning towards using a delay queue here.
   
   Not sure if the question # 1 about why we need to delay closing a connection 
got answered, so answering just in case. Since we want to throttle accepting 
connections from an IP, closing a connection due to reaching IP quota right 
away would not help with throttling accepting connections since that IP is 
going to reconnect. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-27 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
 close(endPoint.listenerName, socketChannel)
 None
+  case e: ConnectionThrottledException =>
+val ip = socketChannel.socket.getInetAddress
+debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
   I agree, that it is better to optimize for checking connections vs. 
overhead of adding/removing to the queue, because `closeThrottledConnections` 
runs pretty often (on a loop), which I think also means that finding that there 
are no connections yet to unthrottle would also be common. Or very few 
connections to unthrottle. So, after reading all your evaluations above, I am 
also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection 
got answered, so answering just in case. Since we want to throttle accepting 
connections from an IP, closing a connection due to reaching IP quota right 
away would not help with throttling accepting connections since that IP is 
going to reconnect. 
   
   if we close the connection right away, that IP is going to reconnect right 
away, adn 

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
 close(endPoint.listenerName, socketChannel)
 None
+  case e: ConnectionThrottledException =>
+val ip = socketChannel.socket.getInetAddress
+debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
   I agree, that it is better to optimize for checking connections vs. 
overhead of adding/removing to the queue, because `closeThrottledConnections` 
runs pretty often (on a loop), which I think also means that finding that there 
are no connections yet to unthrottle would also be common. Or very few 
connections to unthrottle. So, after reading all your evaluations above, I am 
also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection 
got answered, so answering just in case. Since we want to throttle accepting 
connections from an IP, closing a connection due to reaching IP quota right 
away would not help with throttling accepting connections since that IP is 
going to reconnect. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #9512: [DRAFT] - KAFKA-10394: generate snapshot

2020-10-27 Thread GitBox


jsancio opened a new pull request #9512:
URL: https://github.com/apache/kafka/pull/9512


   This PR depends on https://github.com/apache/kafka/pull/9505. I will rebase 
this PR once https://github.com/apache/kafka/pull/9505 has been merged.
   
   This PR adds support for generating snapshot for KIP-630.
   
   1. It introduces the interface `SnapshotWriter` and implementation 
`KafkaSnapshotWriter` for creating snapshots on disk.
   2. It introduces the interface `SnapshotReader` and implementation 
`KafkaSnapshotReader for reading snapshot from disk.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9511:
URL: https://github.com/apache/kafka/pull/9511#issuecomment-717387683


   Once merged I'll cherry-pick this to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck commented on pull request #9511:
URL: https://github.com/apache/kafka/pull/9511#issuecomment-717387337


   @kowshik here's the text we discussed offline yesterday for the upgrade 
section.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9511: MINOR: Add KIP-584 to upgrade.html file

2020-10-27 Thread GitBox


bbejeck opened a new pull request #9511:
URL: https://github.com/apache/kafka/pull/9511


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-27 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r512847195



##
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataOutputStreamWritable;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BatchBuilder {

Review comment:
   Yes, that is fair. Initially I had planned to reuse as much as possible, 
but the existing classes had too much baggage. I wanted implementations which 
were simple and efficient for the needs of the raft layer. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brbrown25 edited a comment on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…

2020-10-27 Thread GitBox


brbrown25 edited a comment on pull request #9057:
URL: https://github.com/apache/kafka/pull/9057#issuecomment-717322569


   - [x] update with latest master
   - [ ] implement nested field support
   - [ ] implement salt support



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10637) KafkaProducer: IllegalMonitorStateException

2020-10-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221527#comment-17221527
 ] 

Chia-Ping Tsai commented on KAFKA-10637:


[~katiforis] Thanks for your report. Could you share the code which can 
reproduce this error to me?

> KafkaProducer: IllegalMonitorStateException 
> 
>
> Key: KAFKA-10637
> URL: https://issues.apache.org/jira/browse/KAFKA-10637
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.1
>Reporter: Lefteris Katiforis
>Priority: Major
>
> Kafka producer throws the following exception:
> {code:java}
> {\"log\":\"java.lang.IllegalMonitorStateException: current thread is not 
> owner\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415014714Z\"}"}
>  java.base/java.lang.Object.wait(Native 
> Method)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.41502027Z\"}"}
> org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415024923Z\"}"}
> at 
> org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415029863Z\"}"}
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1029)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415034336Z\"}"}
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:883)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415038722Z\"}"}
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415042939Z\"}"}
> org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415047238Z\"}"}
> org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415051555Z\"}"}
> org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:369)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415055882Z\"}"}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] joshuagrisham commented on pull request #9492: KAFKA-10627: Added support for Connect TimestampConverter to convert multiple fields using a comma-separated list, and changed the Strin

2020-10-27 Thread GitBox


joshuagrisham commented on pull request #9492:
URL: https://github.com/apache/kafka/pull/9492#issuecomment-717335784


   Yeah unfortunately some of the downstream the checks are still failing, it 
seems to be related to this issue 
https://issues.apache.org/jira/browse/KAFKA-10017
   But all tests related to changes here are passing as well as checkstyleMain, 
checkstyleTest, spotbugsMain, spotbugsTest should all pass for these changes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10650) Use Murmur3 hashing instead of MD5 in SkimpyOffsetMap

2020-10-27 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-10650:
---

 Summary: Use Murmur3 hashing instead of MD5 in SkimpyOffsetMap
 Key: KAFKA-10650
 URL: https://issues.apache.org/jira/browse/KAFKA-10650
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Viktor Somogyi-Vass
Assignee: Viktor Somogyi-Vass


The usage of MD5 has been uncovered during testing Kafka for FIPS (Federal 
Information Processing Standards) verification.

While MD5 isn't a FIPS incompatibility here as it isn't used for cryptographic 
purposes, I spent some time with this as it isn't ideal either. MD5 is a 
relatively fast crypto hashing algo but there are much better performing 
algorithms for hash tables as it's used in SkimpyOffsetMap.

By applying Murmur3 (that is implemented in Streams) I could achieve a 3x 
faster {{put}} operation and the overall segment cleaning sped up by 30% while 
preserving the same collision rate (both performed within 0.0015 - 0.007, 
mostly with 0.004 median).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10650) Use Murmur3 hashing instead of MD5 in SkimpyOffsetMap

2020-10-27 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-10650:

Description: 
The usage of MD5 has been uncovered during testing Kafka for FIPS (Federal 
Information Processing Standards) verification.

While MD5 isn't a FIPS incompatibility here as it isn't used for cryptographic 
purposes, I spent some time with this as it isn't ideal either. MD5 is a 
relatively fast crypto hashing algo but there are much better performing 
algorithms for hash tables as it's used in SkimpyOffsetMap.

By applying Murmur3 (that is implemented in Streams) I could achieve a 3x 
faster {{put}} operation and the overall segment cleaning sped up by 30% while 
preserving the same collision rate (both performed within 0.0015 - 0.007, 
mostly with 0.004 median).

The usage of Murmur3 was decided as research paper [1] shows Murmur2 is 
relatively a good choice for hash tables. Based on this Since Murmur3 is 
available in the project I used that. 

[1]
https://www.researchgate.net/publication/235663569_Performance_of_the_most_common_non-cryptographic_hash_functions

  was:
The usage of MD5 has been uncovered during testing Kafka for FIPS (Federal 
Information Processing Standards) verification.

While MD5 isn't a FIPS incompatibility here as it isn't used for cryptographic 
purposes, I spent some time with this as it isn't ideal either. MD5 is a 
relatively fast crypto hashing algo but there are much better performing 
algorithms for hash tables as it's used in SkimpyOffsetMap.

By applying Murmur3 (that is implemented in Streams) I could achieve a 3x 
faster {{put}} operation and the overall segment cleaning sped up by 30% while 
preserving the same collision rate (both performed within 0.0015 - 0.007, 
mostly with 0.004 median).


> Use Murmur3 hashing instead of MD5 in SkimpyOffsetMap
> -
>
> Key: KAFKA-10650
> URL: https://issues.apache.org/jira/browse/KAFKA-10650
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> The usage of MD5 has been uncovered during testing Kafka for FIPS (Federal 
> Information Processing Standards) verification.
> While MD5 isn't a FIPS incompatibility here as it isn't used for 
> cryptographic purposes, I spent some time with this as it isn't ideal either. 
> MD5 is a relatively fast crypto hashing algo but there are much better 
> performing algorithms for hash tables as it's used in SkimpyOffsetMap.
> By applying Murmur3 (that is implemented in Streams) I could achieve a 3x 
> faster {{put}} operation and the overall segment cleaning sped up by 30% 
> while preserving the same collision rate (both performed within 0.0015 - 
> 0.007, mostly with 0.004 median).
> The usage of Murmur3 was decided as research paper [1] shows Murmur2 is 
> relatively a good choice for hash tables. Based on this Since Murmur3 is 
> available in the project I used that. 
> [1]
> https://www.researchgate.net/publication/235663569_Performance_of_the_most_common_non-cryptographic_hash_functions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] brbrown25 commented on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…

2020-10-27 Thread GitBox


brbrown25 commented on pull request #9057:
URL: https://github.com/apache/kafka/pull/9057#issuecomment-717322569


   - [ ] update with latest master
   - [ ] implement nested field support
   - [ ] implement salt support



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino closed pull request #9510: Add additional MetadataRecordType enums

2020-10-27 Thread GitBox


rondagostino closed pull request #9510:
URL: https://github.com/apache/kafka/pull/9510


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #9510: Add additional MetadataRecordType enums

2020-10-27 Thread GitBox


rondagostino opened a new pull request #9510:
URL: https://github.com/apache/kafka/pull/9510


   Note that I had to remove `"mapKey": true` from `FeatureLevelRecord.json` 
because with it I was seeing this stack trace during the build:
   
   ```
   Exception in thread "main" java.lang.RuntimeException: Exception while 
processing src/main/resources/common/metadata/FeatureLevelRecord.json
   at 
org.apache.kafka.message.MessageGenerator.processDirectories(MessageGenerator.java:233)
   at 
org.apache.kafka.message.MessageGenerator.main(MessageGenerator.java:351)
   Caused by: java.lang.RuntimeException: Cannot set mapKey on top level fields.
   at 
org.apache.kafka.message.MessageDataGenerator.generateClass(MessageDataGenerator.java:87)
   at 
org.apache.kafka.message.MessageDataGenerator.generate(MessageDataGenerator.java:67)
   at 
org.apache.kafka.message.MessageDataGenerator.generateAndWrite(MessageDataGenerator.java:55)
   at 
org.apache.kafka.message.MessageGenerator.processDirectories(MessageGenerator.java:225)
   ``` 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove unneeded javadoc for TopologyTestDriver and some refactor

2020-10-27 Thread GitBox


showuon commented on pull request #9507:
URL: https://github.com/apache/kafka/pull/9507#issuecomment-717223593


   tests/Build/JDK 8: Pass
   tests/Build/JDK 15: unrelated to my change
   failed - 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
 
   tests/Build/JDK 11: unrelated to my change
   failed - kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10108) The cached configs of SslFactory should be updated only if the ssl Engine Factory is updated successfully

2020-10-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10108.

Resolution: Won't Fix

> The cached configs of SslFactory should be updated only if the ssl Engine 
> Factory is updated successfully
> -
>
> Key: KAFKA-10108
> URL: https://issues.apache.org/jira/browse/KAFKA-10108
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> The following cases should NOT change the cached configs of SslFactory.
> 1. validate reconfiguration
> 2.  throw exception when checking the new ssl engine factory



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10125) The partition which is removing should be considered to be under reassignment

2020-10-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10125.

Resolution: Won't Fix

> The partition which is removing should be considered to be under reassignment
> -
>
> Key: KAFKA-10125
> URL: https://issues.apache.org/jira/browse/KAFKA-10125
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> When a reassignment is still in progress, the replica which is either 
> removing or adding should be considered to be under reassignment. However, 
> TopicCommand still print the partition which is removing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 closed pull request #8837: KAFKA-10125 The partition which is removing should be considered to b…

2020-10-27 Thread GitBox


chia7712 closed pull request #8837:
URL: https://github.com/apache/kafka/pull/8837


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #8806: KAFKA-10108 The cached configs of SslFactory should be updated only i…

2020-10-27 Thread GitBox


chia7712 closed pull request #8806:
URL: https://github.com/apache/kafka/pull/8806


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9697) ControlPlaneNetworkProcessorAvgIdlePercent is always NaN

2020-10-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221385#comment-17221385
 ] 

Chia-Ping Tsai commented on KAFKA-9697:
---

the patch may break metrics so KIP is required.

> ControlPlaneNetworkProcessorAvgIdlePercent is always NaN
> 
>
> Key: KAFKA-9697
> URL: https://issues.apache.org/jira/browse/KAFKA-9697
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: James Cheng
>Priority: Major
>  Labels: need-kip
>
> I have a broker running Kafka 2.3.0. The value of 
> kafka.network:type=SocketServer,name=ControlPlaneNetworkProcessorAvgIdlePercent
>  is always "NaN".
> Is that normal, or is there a problem with the metric?
> I am running Kafka 2.3.0. I have not checked this in newer/older versions.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9697) ControlPlaneNetworkProcessorAvgIdlePercent is always NaN

2020-10-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-9697:
--
Labels: need-kip  (was: )

> ControlPlaneNetworkProcessorAvgIdlePercent is always NaN
> 
>
> Key: KAFKA-9697
> URL: https://issues.apache.org/jira/browse/KAFKA-9697
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: James Cheng
>Priority: Major
>  Labels: need-kip
>
> I have a broker running Kafka 2.3.0. The value of 
> kafka.network:type=SocketServer,name=ControlPlaneNetworkProcessorAvgIdlePercent
>  is always "NaN".
> Is that normal, or is there a problem with the metric?
> I am running Kafka 2.3.0. I have not checked this in newer/older versions.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9697) ControlPlaneNetworkProcessorAvgIdlePercent is always NaN

2020-10-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-9697:
-

Assignee: (was: Chia-Ping Tsai)

> ControlPlaneNetworkProcessorAvgIdlePercent is always NaN
> 
>
> Key: KAFKA-9697
> URL: https://issues.apache.org/jira/browse/KAFKA-9697
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: James Cheng
>Priority: Major
>
> I have a broker running Kafka 2.3.0. The value of 
> kafka.network:type=SocketServer,name=ControlPlaneNetworkProcessorAvgIdlePercent
>  is always "NaN".
> Is that normal, or is there a problem with the metric?
> I am running Kafka 2.3.0. I have not checked this in newer/older versions.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9283: KAFKA-9697 hide the metrics of control.plane.listener.name if it is u…

2020-10-27 Thread GitBox


chia7712 commented on pull request #9283:
URL: https://github.com/apache/kafka/pull/9283#issuecomment-717212130


   As it may break metrics compatibility, I'm going to close it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #9283: KAFKA-9697 hide the metrics of control.plane.listener.name if it is u…

2020-10-27 Thread GitBox


chia7712 closed pull request #9283:
URL: https://github.com/apache/kafka/pull/9283


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9169: MINOR: the scheduler used to perform rebalance should have thread prefix

2020-10-27 Thread GitBox


chia7712 commented on pull request #9169:
URL: https://github.com/apache/kafka/pull/9169#issuecomment-717211456


   @ijuma Does it need a KIP to resolve naming conflict in metrics? Although 
I'm wonder whether is there a use case using this conflicted name.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #9469: MINOR: replace FetchRequest.TopicAndPartitionData by Map.Entry

2020-10-27 Thread GitBox


chia7712 closed pull request #9469:
URL: https://github.com/apache/kafka/pull/9469


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9469: MINOR: replace FetchRequest.TopicAndPartitionData by Map.Entry

2020-10-27 Thread GitBox


chia7712 commented on pull request #9469:
URL: https://github.com/apache/kafka/pull/9469#issuecomment-717207972


   @ijuma Thanks for your comment. close it due to the bad refactor :(



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9509: MINOR: rename metrics "iotime-total" to "io-time-total" and "io-waitt…

2020-10-27 Thread GitBox


chia7712 commented on pull request #9509:
URL: https://github.com/apache/kafka/pull/9509#issuecomment-717207173


   @dajac thanks for reminder. I don't think this patch is worth doing that 
effort. close it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #9509: MINOR: rename metrics "iotime-total" to "io-time-total" and "io-waitt…

2020-10-27 Thread GitBox


chia7712 closed pull request #9509:
URL: https://github.com/apache/kafka/pull/9509


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8124) Beginning offset is after the ending offset for topic partition

2020-10-27 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221366#comment-17221366
 ] 

Yu Wang commented on KAFKA-8124:


+1 spark2.3.2 with kafka2.0

> Beginning offset is after the ending offset for topic partition
> ---
>
> Key: KAFKA-8124
> URL: https://issues.apache.org/jira/browse/KAFKA-8124
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: OS : Rhel 7
> server : VM
>Reporter: suseendramani
>Priority: Major
>
>  
> We are getting this issue in production and Sparks consumer dying because of 
> Off Set issue.
> We observed the following error in Kafka Broker ( that has problems)
> --
> [2019-03-18 14:40:14,100] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1692e9ff4410004 has expired (org.apache.zookeeper.ClientCnxn)
>  [2019-03-18 14:40:14,100] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1692e9ff4410004 has expired, closing socket connection 
> (org.apache.zook
>  eeper.ClientCnxn)
> ---
> Error from other broker when talking to the problematic broker.
>  [2019-03-18 14:40:14,107] INFO [ReplicaFetcher replicaId=3, leaderId=5, 
> fetcherId=0] Error sending fetch request (sessionId=2127346653, 
> epoch=27048427) to
>  node 5: java.nio.channels.ClosedSelectorException. 
> (org.apache.kafka.clients.FetchSessionHandler)
>  
> 
>  
> All topics were having replication factor of 3 and this issue happens when 
> one of the broker was having issues. We are using SCRAM authentication 
> (SHA-256) and SSL.
>  
> Sparks Job died with the following error:
> ERROR 2019-03-18 07:40:57,178 7924 org.apache.spark.executor.Executor 
> [Executor task launch worker for task 16] Exception in task 27.0 in stage 0.0 
> (TID 16)
>  java.lang.AssertionError: assertion failed: Beginning offset 115204574 is 
> after the ending offset 115204516 for topic  partition 37. You 
> either provided an invalid fromOffset, or the Kafka topic has been damaged
>  at scala.Predef$.assert(Predef.scala:170)
>  at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:175)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  
> ---
>  
> please let me know if you need more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2020-10-27 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7987:
--
Issue Type: Bug  (was: Improvement)
  Priority: Critical  (was: Major)

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Critical
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2020-10-27 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221328#comment-17221328
 ] 

Rajini Sivaram commented on KAFKA-7987:
---

[~junrao] This is still an open issue for all versions of Kafka, right? I am 
looking into an authorizer issue where authorizer notifications were not 
processed for a long time. Heap dump shows that the authorizer's 
ZookeeperClient is in AUTH_FAILED state. ZK is Kerberos-enabled and there are a 
couple of authentication failures in the logs due to clock-skew errors, which 
look like the reason why the authorizer's ZooKeeperClient got into this state. 
For the authorizer, we do need to schedule retries in this case. But the issue 
doesn't seem to have affected other operations of the broker in this case, 
presumably because we retry connections for the other ZooKeeperClient when 
there are requests. We should still apply the retry fix to the common 
ZooKeeperClient, right? Since the affected broker didn't pick up ACL updates 
made on other brokers, it is a critical security issue. But I wanted to check 
if we have applied any fixes in this area in newer versions of Kafka. Thanks.


> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9509: MINOR: rename metrics "iotime-total" to "io-time-total" and "io-waitt…

2020-10-27 Thread GitBox


dajac commented on pull request #9509:
URL: https://github.com/apache/kafka/pull/9509#issuecomment-717165385


   @chia7712 Thanks for the PR. Renaming it may break monitoring of existing 
users. As metrics are part of the public API, we may need a KIP to fix this...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10649) CPU increase after the upgrade to Kafka 2.5

2020-10-27 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221296#comment-17221296
 ] 

Ismael Juma commented on KAFKA-10649:
-

Thanks for the report. This is likely KAFKA-9731. I would suggest upgrading to 
2.6.0.

> CPU increase after the upgrade to Kafka 2.5
> ---
>
> Key: KAFKA-10649
> URL: https://issues.apache.org/jira/browse/KAFKA-10649
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.1
> Environment: Ubuntu 16.04
>Reporter: Valentina Glagoleva
>Priority: Major
>  Labels: performance
> Attachments: CPU_usage_grafana.png
>
>
> After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
> clusters now use 10-15% more CPU than before:
> !CPU_usage_grafana.png|width=613,height=210!
> The increase happened right after the rolling upgrade of the cluster and CPU 
> usage stayed higher than usual since then.
> We made no changes to the application side or usage patterns.
>  The only thing that we noticed is that number of FETCH request seemed to 
> increase a bit as well. These additional fetch requests seem to come from 
> brokers intercommunication.
> What can be the reason for an increase of number of fetch requests and CPU 
> usage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9505: KAFKA-10393: messages for fetch snapshot and fetch

2020-10-27 Thread GitBox


chia7712 commented on a change in pull request #9505:
URL: https://github.com/apache/kafka/pull/9505#discussion_r512566562



##
File path: clients/src/main/resources/common/message/FetchSnapshotRequest.json
##
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 58,
+  "type": "request",
+  "name": "FetchSnapshotRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ReplicaId", "type": "int32", "versions": "0+",
+  "about": "The broker ID of the follower." },
+{ "name": "MaxBytes", "type": "int32", "versions": "0+",
+  "about": "The maximum bytes to fetch from all of the snapshots." },
+{ "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
+  "about": "The topics to fetch.", "fields": [
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The name of the topic to fetch." },
+  { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
+"about": "The partitions to fetch.", "fields": [
+{ "name": "Index", "type": "int32", "versions": "0+",

Review comment:
   it seems to me ```PartitionIndex``` is a more common name in protocol.

##
File path: clients/src/main/resources/common/message/FetchSnapshotResponse.json
##
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 58,
+  "type": "response",
+  "name": "FetchSnapshotResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": 
false,
+  "about": "The top level response error code." },
+{ "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
+  "about": "The topics to fetch.", "fields": [
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The name of the topic to fetch." },
+  { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
+"about": "The partitions to fetch.", "fields": [
+{ "name": "Index", "type": "int32", "versions": "0+",

Review comment:
   ditto





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10647) Only serialize owned partition when consumer protocol version >= 0

2020-10-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10647.
-
Resolution: Fixed

> Only serialize owned partition when consumer protocol version >= 0 
> ---
>
> Key: KAFKA-10647
> URL: https://issues.apache.org/jira/browse/KAFKA-10647
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 2.7.0
>
>
> A regression got introduced by https://github.com/apache/kafka/pull/8897. The 
> owned partition field must be ignored for version < 1 otherwise the 
> serialization fails with an unsupported version exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #9506: KAFKA-10647; Only serialize owned partitions when consumer protocol version >= 1

2020-10-27 Thread GitBox


dajac merged pull request #9506:
URL: https://github.com/apache/kafka/pull/9506


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9506: KAFKA-10647; Only serialize owned partitions when consumer protocol version >= 1

2020-10-27 Thread GitBox


dajac commented on pull request #9506:
URL: https://github.com/apache/kafka/pull/9506#issuecomment-717132358


   @hachikuji @chia7712 Thanks for your review! Merging to trunk and 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10649) CPU increase after the upgrade to Kafka 2.5

2020-10-27 Thread Valentina Glagoleva (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentina Glagoleva updated KAFKA-10649:

Description: 
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
 The only thing that we noticed is that number of FETCH request seemed to 
increase a bit as well. These additional fetch requests seem to come from 
Brokers intercommunication.

What can be the reason for an increase of number of fetch requests and CPU 
usage?

  was:
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
 The only thing that we noticed is that number of FETCH request seemed to 
increase a bit as well. This additional FETCH requests seem to come from 
Brokers intercommunication.

What can be the reason for an increase of number of fetch requests and CPU 
usage?


> CPU increase after the upgrade to Kafka 2.5
> ---
>
> Key: KAFKA-10649
> URL: https://issues.apache.org/jira/browse/KAFKA-10649
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.1
> Environment: Ubuntu 16.04
>Reporter: Valentina Glagoleva
>Priority: Major
>  Labels: performance
> Attachments: CPU_usage_grafana.png
>
>
> After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
> clusters now use 10-15% more CPU than before:
> !CPU_usage_grafana.png|width=613,height=210!
> The increase happened right after the rolling upgrade of the cluster and CPU 
> usage stayed higher than usual since then.
> We made no changes to the application side or usage patterns.
>  The only thing that we noticed is that number of FETCH request seemed to 
> increase a bit as well. These additional fetch requests seem to come from 
> Brokers intercommunication.
> What can be the reason for an increase of number of fetch requests and CPU 
> usage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10649) CPU increase after the upgrade to Kafka 2.5

2020-10-27 Thread Valentina Glagoleva (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentina Glagoleva updated KAFKA-10649:

Description: 
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
 The only thing that we noticed is that number of FETCH request seemed to 
increase a bit as well. These additional fetch requests seem to come from 
brokers intercommunication.

What can be the reason for an increase of number of fetch requests and CPU 
usage?

  was:
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
 The only thing that we noticed is that number of FETCH request seemed to 
increase a bit as well. These additional fetch requests seem to come from 
Brokers intercommunication.

What can be the reason for an increase of number of fetch requests and CPU 
usage?


> CPU increase after the upgrade to Kafka 2.5
> ---
>
> Key: KAFKA-10649
> URL: https://issues.apache.org/jira/browse/KAFKA-10649
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.1
> Environment: Ubuntu 16.04
>Reporter: Valentina Glagoleva
>Priority: Major
>  Labels: performance
> Attachments: CPU_usage_grafana.png
>
>
> After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
> clusters now use 10-15% more CPU than before:
> !CPU_usage_grafana.png|width=613,height=210!
> The increase happened right after the rolling upgrade of the cluster and CPU 
> usage stayed higher than usual since then.
> We made no changes to the application side or usage patterns.
>  The only thing that we noticed is that number of FETCH request seemed to 
> increase a bit as well. These additional fetch requests seem to come from 
> brokers intercommunication.
> What can be the reason for an increase of number of fetch requests and CPU 
> usage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10649) CPU increase after the upgrade to Kafka 2.5

2020-10-27 Thread Valentina Glagoleva (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentina Glagoleva updated KAFKA-10649:

Description: 
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
 The only thing that we noticed is that number of FETCH request seemed to 
increase a bit as well. This additional FETCH requests seem to come from 
Brokers intercommunication.

What can be the reason for an increase of number of fetch requests and CPU 
usage?

  was:
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
The only thing that we noticed is that number of FETCH request seem to increase 
a bit as well. This additional FETCH requests seem to come from Brokers 
intercommunication.


What can be the reason for an increase of number of fetch requests and CPU 
usage?


> CPU increase after the upgrade to Kafka 2.5
> ---
>
> Key: KAFKA-10649
> URL: https://issues.apache.org/jira/browse/KAFKA-10649
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.1
> Environment: Ubuntu 16.04
>Reporter: Valentina Glagoleva
>Priority: Major
>  Labels: performance
> Attachments: CPU_usage_grafana.png
>
>
> After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
> clusters now use 10-15% more CPU than before:
> !CPU_usage_grafana.png|width=613,height=210!
> The increase happened right after the rolling upgrade of the cluster and CPU 
> usage stayed higher than usual since then.
> We made no changes to the application side or usage patterns.
>  The only thing that we noticed is that number of FETCH request seemed to 
> increase a bit as well. This additional FETCH requests seem to come from 
> Brokers intercommunication.
> What can be the reason for an increase of number of fetch requests and CPU 
> usage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove unneeded javadoc for TopologyTestDriver and some refactor

2020-10-27 Thread GitBox


showuon commented on pull request #9507:
URL: https://github.com/apache/kafka/pull/9507#issuecomment-717119800


   @chia7712 , thanks for your reminder. I've removed all the unnecessary 
parameters (BOOTSTRAP_SERVERS_CONFIG and APPLICATION_ID_CONFIG) from the tests 
which are using TopologyTestDriver in this commit: 
https://github.com/apache/kafka/pull/9507/commits/5a03f5993987ab341cd14e3708190aefa6ad7f0a.
 And also make sure it still passes tests in my local env. Let's see if it can 
pass the jenkins tests. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10649) CPU increase after the upgrade to Kafka 2.5

2020-10-27 Thread Valentina Glagoleva (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentina Glagoleva updated KAFKA-10649:

Description: 
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.
The only thing that we noticed is that number of FETCH request seem to increase 
a bit as well. This additional FETCH requests seem to come from Brokers 
intercommunication.


What can be the reason for an increase of number of fetch requests and CPU 
usage?

  was:
After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
clusters now use 10-15% more CPU than before:

!CPU_usage_grafana.png|width=613,height=210!

The increase happened right after the rolling upgrade of the cluster and CPU 
usage stayed higher than usual since then.

We made no changes to the application side or usage patterns.

 

What can be the reason for a CPU usage increase?


> CPU increase after the upgrade to Kafka 2.5
> ---
>
> Key: KAFKA-10649
> URL: https://issues.apache.org/jira/browse/KAFKA-10649
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.1
> Environment: Ubuntu 16.04
>Reporter: Valentina Glagoleva
>Priority: Major
>  Labels: performance
> Attachments: CPU_usage_grafana.png
>
>
> After an upgrade from Kafka 2.2.1 to Kafka 2.5.1 we noticed, that some of our 
> clusters now use 10-15% more CPU than before:
> !CPU_usage_grafana.png|width=613,height=210!
> The increase happened right after the rolling upgrade of the cluster and CPU 
> usage stayed higher than usual since then.
> We made no changes to the application side or usage patterns.
> The only thing that we noticed is that number of FETCH request seem to 
> increase a bit as well. This additional FETCH requests seem to come from 
> Brokers intercommunication.
> What can be the reason for an increase of number of fetch requests and CPU 
> usage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >