[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-10 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739822#comment-16739822
 ] 

Mayuresh Gharat commented on KAFKA-4453:


Hi [~sriharsha], 
Yes, the PR is already up. I got +1 from [~jjkoshy] yesterday. We are waiting 
to see if [~junrao] has any other concerns with the patch.


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat commented on KAFKA-7681:


Hi [~junrao],

#  IIUC, After inspecting the code and your suggestion above, this seems doable 
by adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

# I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}






> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:57 PM:
---

Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.




was (Author: mgharat):
Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.



> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-11 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718138#comment-16718138
 ] 

Mayuresh Gharat edited comment on KAFKA-7681 at 12/11/18 10:56 PM:
---

Hi [~junrao],

1)
IIUC, After inspecting the code and your suggestion above, this seems doable by 
adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

2)
I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}

I can start putting up a KIP, if we think that 1) would suffice.




was (Author: mgharat):
Hi [~junrao],

#  IIUC, After inspecting the code and your suggestion above, this seems doable 
by adding a new metric type like "localTimeRate" or 
"RequestHandlerThreadpoolUitlizationRate" or similar and having it as a Meter() 
like we have for {code:java} totalProduceRequestRate {code}.
This will give us the rate of local time for each request which ~ usage of the 
RequestHandlerThreadPool.

# I was thinking more on the lines of having a ratio, that would give us 
instantaneous value (Guage) like we have for "NetworkProcessorAvgIdlePercent", 
but for each request type wherein the 
{code:java}
value = (Total sampled Local Time of A Request) / (Total of sampled local times 
of all the requests) 
{code}






> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-10 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715264#comment-16715264
 ] 

Mayuresh Gharat commented on KAFKA-7681:


[~junrao] will try to give a shot at this. 

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-10 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-7681:
--

Assignee: Mayuresh Gharat

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-27 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701387#comment-16701387
 ] 

Mayuresh Gharat commented on KAFKA-7681:


Hi [~junrao], 
This seems more like the Broker Topic Metrics that we have today but at the 
RequestHandler level and not for a specific topic. Is my understanding correct?

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2018-11-12 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-4453:
---
Issue Type: Improvement  (was: Bug)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7546) Java implementation for Authorizer

2018-10-31 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670335#comment-16670335
 ] 

Mayuresh Gharat commented on KAFKA-7546:


[~bansalp], I believe that would depend on what principal your broker is using 
and the operations would be most of it described here :

kafka.security.auth.Operation. The ones that I can remember on top of my head 
are : Read, Write, Describe, Create.

At Linkedin, we add the Broker principal as an admin Principal dynamically at 
runtime, so if you get a request with that principal, we don't check ACLs for 
it. This is with the assumption/guarantee that no one else will be able to 
access the broker cert themselves outside Kafka broker hosts.

 

Hope this helps.

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
> Attachments: AuthorizerImpl.PNG
>
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-26 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
Today when we call KafkaConsumer.poll(), it will fetch data from Kafka 
asynchronously and is put in to a local buffer (completedFetches).

If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might 
throw away any buffered data that we might have in the local buffer for these 
TopicPartitions. Generally, if an application is calling pause on some 
TopicPartitions, it is likely to resume those TopicPartitions in near future, 
which would require KafkaConsumer to re-issue a fetch for the same data that it 
had buffered earlier for these TopicPartitions. This is a wasted effort from 
the application's point of view.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark to compare the "before-fix" and "after-fix" versions.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call.
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10 

||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number 
of Records consumed (After fix)||
|9|2087|4884693|

 

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, in next call to poll we remove the completedFetches for those 
paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call.
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10 

||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number 
of Records consumed (After fix)||
|9|2087|4884693|

 


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today when we call KafkaConsumer.poll(), it will fetch data from Kafka 
> asynchronously and is put in to a local buffer (completedFetches).
> If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might 
> throw away any buffered data that we might have in the local buffer for these 
> TopicPartitions. Generally, if an application is calling pause on some 
> TopicPartitions, it is likely to resume those TopicPartitions in near future, 
> which would require KafkaConsumer to re-issue a fetch for the same data that 
> it had buffered earlier for these TopicPartitions. This is a wasted effort 
> from the application's point of view.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark to compare the "before-fix" and "after-fix" versions.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused predefined number partitions for every poll call. The partitions to 
> pause were chosen randomly for each poll() call.
>  * Time to run Benchmark = 60 seconds.
>  * MaxPollRecords = 1
>  * Number of TopicPartition subscribed  = 10 
> ||Number Of Partitions Paused||Number of Records consumed (Before 
> fix)||Number of Records consumed (After fix)||
> |9|2087|4884693|
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-26 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, in next call to poll we remove the completedFetches for those 
paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call.
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10 

||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number 
of Records consumed (After fix)||
|9|2087|4884693|

 

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, in next call to poll we remove the completedFetches for those 
paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call. We ran this benchmark multiple times 
pausing different number for of partitions for each run. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix*

 
||Number Of Partitions Paused||Number of Records consumed||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix*
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
  


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, in next call to poll we remove the completedFetches for those 
> paused partitions. Normally, if an application is calling pause on 
> topicPartitions, it is likely to return to those topicPartitions in near 
> future and when it does, with the current design we would have to re-fetch 
> that data.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark were we compared what is the throughput w.r.t to different values 
> of maxPollRecords.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused predefined number partitions for every poll call. The partitions to 
> pause were chosen randomly for each poll() call.
>  * Time to run Benchmark = 60 seconds.
>  * MaxPollRecords = 1
>  * Number of TopicPartition subscribed  = 10 
> ||Number Of Partitions Paused||Number of Records consumed (Before 
> fix)||Number of Records consumed (After fix)||
> |9|2087|4884693|
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-26 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, in next call to poll we remove the completedFetches for those 
paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call. We ran this benchmark multiple times 
pausing different number for of partitions for each run. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix*

 
||Number Of Partitions Paused||Number of Records consumed||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix*
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
  

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. The partitions to pause 
were chosen randomly for each poll() call. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix* 

 
||Number Of Partitions Paused||Number of Records consumed ||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix* 
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
 


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, in next call to poll we remove the completedFetches for those 
> paused partitions. Normally, if an application is calling pause on 
> topicPartitions, it is likely to return to those topicPartitions in near 
> future and when it does, with the current design we would have to re-fetch 
> that data.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark were we compared what is the throughput w.r.t to different values 
> of maxPollRecords.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused predefined number partitions for every poll call. The partitions to 
> pause were chosen randomly for each poll() call. We ran this benchmark 
> multiple times pausing different number for of partitions for each run. Here 
> are the results :
>  
>  * Time to run Benchmark = 60 seconds.
>  * MaxPollRecords = 1
>  * Number of TopicPartition subscribed  = 10
> *Before fix*
>  
> ||Number Of Partitions Paused||Number of Records consumed||
> |0|6424753|
> |2|10495|
> |4|5004|
> |6|3152|
> |8|2237|
> |9|2087|
>  
>  *After fix*
> ||Number Of Partitions Paused||Number of Records consumed||
> |0|5846512|
> |2|5269557|
> |4|5213496|
> |6|4519645|
> |8|4383300|
> |9|4884693|
>  
>   



--
This message was 

[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-26 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. The partitions to pause 
were chosen randomly for each poll() call. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix* 

 
||Number Of Partitions Paused||Number of Records consumed ||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix* 
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
 

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

 

 

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, it seems that in next call to poll we remove the completedFetches 
> for those paused partitions. Normally, if an application is calling pause on 
> topicPartitions, it is likely to return to those topicPartitions in near 
> future and when it does, with the current design we would have to re-fetch 
> that data.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark were we compared what is the throughput w.r.t to different values 
> of maxPollRecords.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused different number of partitions for every poll call. 

[jira] [Commented] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2018-10-26 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665433#comment-16665433
 ] 

Mayuresh Gharat commented on KAFKA-7556:


One more question, did you try this with non-logcompacted Topic?

Was the response similar, [~rob_v]?

 

Thanks,

 

Mayuresh

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Priority: Critical
>  Labels: documentation, usability
> Fix For: 2.2.0
>
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> test.topic-14=(timestamp=1511266308829, offset=80962), 
> test.topic-71=(timestamp=1511265474740, offset=83607), 
> 

[jira] [Commented] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2018-10-26 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665428#comment-16665428
 ] 

Mayuresh Gharat commented on KAFKA-7556:


Hi [~rob_v],

This seems like a bug to me. Let me take a look, now. Assigning this ticket to 
myself for now. Will change it to unassign (so others can pick this up), if I 
cannot get to this done in timely manner.

 

Thanks,

 

Mayuresh

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Priority: Critical
>  Labels: documentation, usability
> Fix For: 2.2.0
>
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> 

[jira] [Commented] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663321#comment-16663321
 ] 

Mayuresh Gharat commented on KAFKA-7546:


Hi [~bansalp],

 

Kafka Provides Authorizer as an interface. You should be able to implement the 
interface and write your own implementation. You will need to add your 
AuthorizerImpl class name in the config "authorizer.class.name". At Linkedin, 
we have our own implementation of Authorizer that does not use Zookeeper for 
authorization but rather talks to our inhouse ACL store. You should be able to 
do the same.

 

Hope this helps.

 

Thanks,

 

Mayuresh

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

 

 

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :
{noformat}
*no* further _formatting_ is done here{noformat}
*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is 

[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :
{noformat}
*no* further _formatting_ is done here{noformat}
*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is 

[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.41961 sec)|4884693
(60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, it seems that 

[jira] [Created] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7548:
--

 Summary: KafkaConsumer should not throw away already fetched data 
for paused partitions.
 Key: KAFKA-7548
 URL: https://issues.apache.org/jira/browse/KAFKA-7548
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.41961 sec)|4884693
(60.42054 sec)|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522768#comment-16522768
 ] 

Mayuresh Gharat commented on KAFKA-7096:


[~lindong] : [https://github.com/apache/kafka/pull/5289]

 

> Consumer should drop the data for unassigned topic partitions
> -
>
> Key: KAFKA-7096
> URL: https://issues.apache.org/jira/browse/KAFKA-7096
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
> poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
> client unassigns some topics (for example T3) and calls poll() we still hold 
> the data (for T3) in the completedFetches queue until we actually reach the 
> buffered data for the unassigned Topics (T3 in our example) on subsequent 
> poll() calls, at which point we drop that data. This process of holding the 
> data is unnecessary.
> When a client creates a topic, it takes time for the broker to fetch ACLs for 
> the topic. But during this time, the client will issue fetchRequest for the 
> topic, it will get response for the partitions of this topic. The response 
> consist of TopicAuthorizationException for each of the partitions. This 
> response for each partition is wrapped with a completedFetch and added to the 
> completedFetches queue. Now when the client calls the next poll() it sees the 
> TopicAuthorizationException from the first buffered CompletedFetch. At this 
> point the client chooses to sleep for 1.5 min as a backoff (as per the 
> design), hoping that the Broker fetches the ACL from ACL store in the 
> meantime. Actually the Broker has already fetched the ACL by this time. When 
> the client calls poll() after the sleep, it again sees the 
> TopicAuthorizationException from the second completedFetch and it sleeps 
> again. So it takes (1.5 * 60 * partitions) seconds before the client can see 
> any data. With this patch, the client when it sees the first 
> TopicAuthorizationException, it can all assign(EmptySet), which will get rid 
> of the buffered completedFetches (those with TopicAuthorizationException) and 
> it can again call assign(TopicPartitions) before calling poll(). With this 
> patch we found that client was able to get the records as soon as the Broker 
> fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-06-25 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7096:
--

 Summary: Consumer should drop the data for unassigned topic 
partitions
 Key: KAFKA-7096
 URL: https://issues.apache.org/jira/browse/KAFKA-7096
 Project: Kafka
  Issue Type: Improvement
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
client unassigns some topics (for example T3) and calls poll() we still hold 
the data (for T3) in the completedFetches queue until we actually reach the 
buffered data for the unassigned Topics (T3 in our example) on subsequent 
poll() calls, at which point we drop that data. This process of holding the 
data is unnecessary.

When a client creates a topic, it takes time for the broker to fetch ACLs for 
the topic. But during this time, the client will issue fetchRequest for the 
topic, it will get response for the partitions of this topic. The response 
consist of TopicAuthorizationException for each of the partitions. This 
response for each partition is wrapped with a completedFetch and added to the 
completedFetches queue. Now when the client calls the next poll() it sees the 
TopicAuthorizationException from the first buffered CompletedFetch. At this 
point the client chooses to sleep for 1.5 min as a backoff (as per the design), 
hoping that the Broker fetches the ACL from ACL store in the meantime. Actually 
the Broker has already fetched the ACL by this time. When the client calls 
poll() after the sleep, it again sees the TopicAuthorizationException from the 
second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) 
seconds before the client can see any data. With this patch, the client when it 
sees the first TopicAuthorizationException, it can all assign(EmptySet), which 
will get rid of the buffered completedFetches (those with 
TopicAuthorizationException) and it can again call assign(TopicPartitions) 
before calling poll(). With this patch we found that client was able to get the 
records as soon as the Broker fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4433) Kafka Controller Does not send a LeaderAndIsr to old leader of a topicPartition during reassignment, if the old leader is not a part of the new assigned replicas

2018-06-18 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-4433:
--

Assignee: (was: Mayuresh Gharat)

> Kafka Controller Does not send a LeaderAndIsr to old leader of a 
> topicPartition during reassignment, if the old leader is not a part of the 
> new assigned replicas
> -
>
> Key: KAFKA-4433
> URL: https://issues.apache.org/jira/browse/KAFKA-4433
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Mayuresh Gharat
>Priority: Critical
>  Labels: reliability
>
> Consider the following scenario :
> old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : 
> {[2,3,4], Leader = 2} 
> In this case broker 1 does not receive a LeaderAndIsr request to become a 
> follower.
> This happens because of the following :
> val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, 
> currentLeaderAndIsr) in PartitionStateMachine.electLeaderForPartition(...) , 
> the replicas returned by ReassignedPartitionLeaderSelector.selectLeader() is 
> only the new Replicas, which are then sent the LeaderAndIsrRequest. So the 
> old replica never receives this LeaderAndIsr.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)