[jira] [Commented] (KAFKA-4453) add request prioritization

2019-04-10 Thread Di Shang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814078#comment-16814078
 ] 

Di Shang commented on KAFKA-4453:
-

Hi

The new metric introduced here breaks our metric parser with NaN value

"kafka.network,SocketServer,ControlPlaneNetworkProcessorAvgIdlePercent": "NaN",

[https://github.com/apache/kafka/blob/2.2/core/src/main/scala/kafka/network/SocketServer.scala#L143]

Is this the best default value for this metric? Can we use a concrete number 
instead?

 

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata[1], rejecting 
> ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] 
> definition of data loss in terms of messages beyond the high watermark)[4].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> [1] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
>  [2] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
>  [3] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
>  [4] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.
> KIP-291: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-02-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773791#comment-16773791
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

ijuma commented on pull request #2181: KAFKA-4453: add request prioritization
URL: https://github.com/apache/kafka/pull/2181
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata[1], rejecting 
> ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] 
> definition of data loss in terms of messages beyond the high watermark)[4].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> [1] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
>  [2] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
>  [3] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
>  [4] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.
> KIP-291: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16741620#comment-16741620
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

junrao commented on pull request #5921: KAFKA-4453 : Added code to separate 
controller connections and requests from the data plane
URL: https://github.com/apache/kafka/pull/5921
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-10 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739822#comment-16739822
 ] 

Mayuresh Gharat commented on KAFKA-4453:


Hi [~sriharsha], 
Yes, the PR is already up. I got +1 from [~jjkoshy] yesterday. We are waiting 
to see if [~junrao] has any other concerns with the patch.


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-10 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739798#comment-16739798
 ] 

Sriharsha Chintalapani commented on KAFKA-4453:
---

Hi [~mgharat] checking to see if you are working on this JIRA. We are 
interested in this feature as well.

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689957#comment-16689957
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

gitlw closed pull request #5783: KAFKA-4453: Separating controller connections 
and requests from the data plane (KIP-291)
URL: https://github.com/apache/kafka/pull/5783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecf6fbf33f1..29e10383afe 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -108,14 +108,16 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
 val messageQueue = new LinkedBlockingQueue[QueueItem]
 debug(s"Controller ${config.brokerId} trying to connect to broker 
${broker.id}")
-val brokerNode = broker.node(config.interBrokerListenerName)
+val controlPlaneListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val controlPlaneSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+val brokerNode = broker.node(controlPlaneListenerName)
 val logContext = new LogContext(s"[Controller id=${config.brokerId}, 
targetBrokerId=${brokerNode.idString}] ")
 val networkClient = {
   val channelBuilder = ChannelBuilders.clientChannelBuilder(
-config.interBrokerSecurityProtocol,
+controlPlaneSecurityProtocol,
 JaasContext.Type.SERVER,
 config,
-config.interBrokerListenerName,
+controlPlaneListenerName,
 config.saslMechanismInterBrokerProtocol,
 config.saslInterBrokerHandshakeRequestEnable
   )
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b09688c5b..0bc8a2a6ff9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -40,6 +40,7 @@ object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
   val RequestQueueSizeMetric = "RequestQueueSize"
+  val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
   val ProcessorMetricTag = "processor"
 
@@ -272,13 +273,13 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String) 
extends KafkaMetricsGroup {
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+  newGauge(requestQueueSizeMetric, new Gauge[Int] {
   def value = requestQueue.size
   })
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 1365f90f763..c3dac79f5f9 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  val requestChannel = new RequestChannel(maxQueuedRequests)
-  private val processors = new ConcurrentHashMap[Int, Processor]()
+  val dataRequestChannel = new RequestChannel(maxQueuedRequests, 
RequestChannel.RequestQueueSizeMetric)
+  var controlPlaneRequestChannel: RequestChannel = null
+  if (config.controlPlaneListenerName.isDefined) {
+controlPlaneRequestChannel = new RequestChannel(20, 
RequestChannel.ControlPlaneRequestQueueSizeMetric)
+  }
+  private val dataProcessors = new ConcurrentHashMap[Int, Processor]()
+  // there should be only one controller processor, however we use a map to 
store it so that we can reuse the logic for data processors
+  private[network] val controlPlaneProcessors = 

[jira] [Commented] (KAFKA-4453) add request prioritization

2018-11-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689832#comment-16689832
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

MayureshGharat opened a new pull request #5921: KAFKA-4453 : Added code to 
separate controller connections and requests from the data plane
URL: https://github.com/apache/kafka/pull/5921
 
 
   KIP-291 Implementation : Added code to separate controller connections and 
requests from the data plane.
   
   - Tested with local deployment that the controller request are handled by 
the control plane and other requests are handled by the data plane.
   - Also added unit tests in order to test the functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by 

[jira] [Commented] (KAFKA-4453) add request prioritization

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646868#comment-16646868
 ] 

ASF GitHub Bot commented on KAFKA-4453:
---

gitlw opened a new pull request #5783: KAFKA-4453: Separating controller 
connections and requests from the data plane (KIP-291)
URL: https://github.com/apache/kafka/pull/5783
 
 
   Separating controller connections and requests from the data plane (KIP-291)
   
   - Tested that there are no dedicated control plane endpoints or threads when 
control.plane.listener.name is not set.
   - Tested that the controller requests are handled by the control plane 
threads when
   control.plane.listener.name is set
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Lucas Wang
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA