[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Component/s: core

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Description: 
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata[1], rejecting 
ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] 
definition of data loss in terms of messages beyond the high watermark)[4].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

[1] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
 [2] Clients can issue ProduceRequests to the wrong broker based on stale 
metadata, causing rejected ProduceRequests. Based on how long the client acts 
based on the stale metadata, the impact may or may not be visible to a producer 
application. If the number of rejected ProduceRequests does not exceed the max 
number of retries, the producer application would not be impacted. On the other 
hand, if the retries are exhausted, the failed produce will be visible to the 
producer application.
 [3] The official definition of data loss in kafka is when we lose a 
"committed" message. A message is considered "committed" when all in sync 
replicas for that partition have applied it to their log.
 [4] Say a number of ProduceRequests are sitting infront of a controller's 
LeaderAndIsrRequest on a given broker's request queue. Suppose the 
ProduceRequests are for partitions whose leadership has recently shifted out 
from the current broker to another broker in the replica set. Today the broker 
processes the ProduceRequests before the LeaderAndIsrRequest, meaning the 
ProduceRequests are getting processed on the former partition leader. As part 
of becoming a follower for a partition, the broker truncates the log to the 
high-watermark. With weaker ack settings such as acks=1, the leader may 
successfully write to its own log, respond to the user with a success, process 
the LeaderAndIsrRequest making the broker a follower of the partition, and 
truncate the log to a point before the user's produced messages. So users have 
a false sense that their produce attempt succeeded while in reality their 
messages got erased. While technically part of what they signed up for with 
acks=1, it can still come as a surprise.

KIP-291: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane]

  was:
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] 
definition of data loss in terms of messages beyond the high watermark)\[4\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partitio

[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Labels: kip  (was: )

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2018-11-12 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-4453:
---
Issue Type: Improvement  (was: Bug)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2018-06-05 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-4453:
--
Description: 
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] 
definition of data loss in terms of messages beyond the high watermark)\[4\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
\[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
metadata, causing rejected ProduceRequests. Based on how long the client acts 
based on the stale metadata, the impact may or may not be visible to a producer 
application. If the number of rejected ProduceRequests does not exceed the max 
number of retries, the producer application would not be impacted. On the other 
hand, if the retries are exhausted, the failed produce will be visible to the 
producer application.
\[3\] The official definition of data loss in kafka is when we lose a 
"committed" message. A message is considered "committed" when all in sync 
replicas for that partition have applied it to their log.
\[4\] Say a number of ProduceRequests are sitting infront of a controller's 
LeaderAndIsrRequest on a given broker's request queue. Suppose the 
ProduceRequests are for partitions whose leadership has recently shifted out 
from the current broker to another broker in the replica set. Today the broker 
processes the ProduceRequests before the LeaderAndIsrRequest, meaning the 
ProduceRequests are getting processed on the former partition leader. As part 
of becoming a follower for a partition, the broker truncates the log to the 
high-watermark. With weaker ack settings such as acks=1, the leader may 
successfully write to its own log, respond to the user with a success, process 
the LeaderAndIsrRequest making the broker a follower of the partition, and 
truncate the log to a point before the user's produced messages. So users have 
a false sense that their produce attempt succeeded while in reality their 
messages got erased. While technically part of what they signed up for with 
acks=1, it can still come as a surprise.

  was:
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
definition of data loss in terms of messages beyond the high watermark)\[3\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
\[2\] The official defini