[jira] [Created] (KAFKA-8814) Consumer benchmark test for paused partitions

2019-08-18 Thread Sean Glover (JIRA)
Sean Glover created KAFKA-8814:
--

 Summary: Consumer benchmark test for paused partitions
 Key: KAFKA-8814
 URL: https://issues.apache.org/jira/browse/KAFKA-8814
 Project: Kafka
  Issue Type: New Feature
  Components: consumer, system tests, tools
Reporter: Sean Glover
Assignee: Sean Glover


A new performance benchmark and corresponding {{ConsumerPerformance}} tools 
addition to support the paused partition performance improvement implemented in 
KAFKA-7548.  Before the fix, when the user would poll for completed fetched 
records for partitions that were paused, the consumer would throw away the data 
because it no longer fetchable.  If the partition is resumed then the data 
would have to be fetched over again.  The fix will cache completed fetched 
records for paused partitions indefinitely so they can be potentially be 
returned once the partition is resumed.

In the Jira issue KAFKA-7548 there are several informal test results shown 
based on a number of different paused partition scenarios, but it was suggested 
that a test in the benchmarks testsuite would be ideal to demonstrate the 
performance improvement.  In order to the implement this benchmark we must 
implement a new feature in {{ConsumerPerformance}} used by the benchmark 
testsuite and the {{kafka-consumer-perf-test.sh}} bin script that will pause 
partitions.  I added the following parameter:

{code:scala}
val pausedPartitionsOpt = parser.accepts("paused-partitions-percent", "The 
percentage [0-1] of subscribed " +
  "partitions to pause each poll.")
.withOptionalArg()
.describedAs("percent")
.withValuesConvertedBy(regex("^0(\\.\\d+)?|1\\.0$")) // matches [0-1] 
with decimals
.ofType(classOf[Float])
.defaultsTo(0F)
{code}

This allows the user to specify a percentage (represented a floating point 
value from {{0..1}}) of partitions to pause each poll interval.  When the value 
is greater than {{0}} then we will take the next _n_ partitions to pause.  I 
ran the test on `trunk` and rebased onto the `2.3.0` tag for the following test 
summaries of 
{{kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput}}.
  The test will rotate through pausing {{80%}} of assigned partitions (5/6) 
each poll interval.  I ran this on my laptop.

{{trunk}} ({{aa4ba8eee8e6f52a9d80a98fb2530b5bcc1b9a11}})

{code}

SESSION REPORT (ALL TESTS)
ducktape version: 0.7.5
session_id:   2019-08-18--010
run time: 2 minutes 29.104 seconds
tests run:1
passed:   1
failed:   0
ignored:  0

test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.paused_partitions_percent=0.8
status: PASS
run time:   2 minutes 29.048 seconds
{"records_per_sec": 450207.0953, "mb_per_sec": 42.9351}

{code}

{{2.3.0}}

{code}

SESSION REPORT (ALL TESTS)
ducktape version: 0.7.5
session_id:   2019-08-18--011
run time: 2 minutes 41.228 seconds
tests run:1
passed:   1
failed:   0
ignored:  0

test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.paused_partitions_percent=0.8
status: PASS
run time:   2 minutes 41.168 seconds
{"records_per_sec": 246574.6024, "mb_per_sec": 23.5152}

{code}

The increase in record and data throughput is significant.  Based on other 
consumer fetch metrics there are also improvements to fetch rate.  Depending on 
how often partitions are paused and resumed it's possible to save a lot of data 
transfer between the consumer and broker as well.

Please see the pull request for the associated changes.  I was unsure if I 
needed to create a KIP because while technically I added a new public api to 
the {{ConsumerPerformance}} tool, it was only to enable this benchmark to run.  
If you feel that a KIP is necessary I'll create one.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8813) Race condition when creating topics and changing their configuration

2019-08-18 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-8813:
---

 Summary: Race condition when creating topics and changing their 
configuration
 Key: KAFKA-8813
 URL: https://issues.apache.org/jira/browse/KAFKA-8813
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira


In Partition.createLog we do:

{{val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, 
props)val log = logManager.getOrCreateLog(topicPartition, config, isNew, 
isFutureReplica)}}

Config changes that arrive after configs are loaded from ZK, but before 
LogManager added the partition to `futureLogs` or `currentLogs` where the 
dynamic config handlers picks up topics to update their configs, will be lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8755) Stand-by Task of an Optimized Source Table Does Not Write Anything to its State Store

2019-08-18 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1691#comment-1691
 ] 

Bruno Cadonna commented on KAFKA-8755:
--

{quote}So it seems that even if we get the checkpoint offset fixed in the 
restore consumer that we're going to have to do a lot of catch up on failover 
that we would not have had to do in the non optimized case. Is this expected 
behavior?{quote}

The catchup on failover in the optimized and not optimized case should be the 
same, because the data read until the last written offset from the input topic 
in the optimized case should be equal to the data we've added to the store. 
That is basically what the above topology does. It reads from the input topic, 
writes to the store, commits (i.e., writes) the offset of the input topic. Then 
it repeats those actions.

> Stand-by Task of an Optimized Source Table Does Not Write Anything to its 
> State Store
> -
>
> Key: KAFKA-8755
> URL: https://issues.apache.org/jira/browse/KAFKA-8755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Bruno Cadonna
>Assignee: Chris Pettitt
>Priority: Major
>  Labels: newbie
> Attachments: StandbyTaskTest.java
>
>
> With the following topology:
> {code:java}
> builder.table(
> INPUT_TOPIC, 
> Consumed.with(Serdes.Integer(), Serdes.Integer()), 
> Materialized.>as(stateName)
> )
> {code}
> and with topology optimization turned on, Kafka Streams uses the input topic 
> {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A 
> stand-by task for such a topology should read from {{INPUT_TOPIC}} and should 
> write the records to its state store so that the streams client that runs the 
> stand-by task can take over the execution of the topology in case of a 
> failure with an up-to-date replica of the state.
> Currently, the stand-by task described above reads from the input topic but 
> does not write the records to its state store. Thus, after a failure the 
> stand-by task cannot provide any up-to-date state store and the streams 
> client needs to construct the state from scratch before it can take over the 
> execution.
> The described behaviour can be reproduced with the attached test.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)

2019-08-18 Thread Werner Daehn (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909991#comment-16909991
 ] 

Werner Daehn commented on KAFKA-8812:
-

Thanks, will do.

> Rebalance Producers - yes, I mean it ;-)
> 
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)

2019-08-18 Thread Werner Daehn (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Werner Daehn resolved KAFKA-8812.
-
Resolution: Invalid

Will create a KIP.

> Rebalance Producers - yes, I mean it ;-)
> 
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)

2019-08-18 Thread Ron Dagostino (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909990#comment-16909990
 ] 

Ron Dagostino commented on KAFKA-8812:
--

If you are serious about this then I think the way to move it forward is to 
write a [Kafka Improvement Proposal 
|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

> Rebalance Producers - yes, I mean it ;-)
> 
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)

2019-08-18 Thread Werner Daehn (JIRA)
Werner Daehn created KAFKA-8812:
---

 Summary: Rebalance Producers - yes, I mean it ;-)
 Key: KAFKA-8812
 URL: https://issues.apache.org/jira/browse/KAFKA-8812
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.3.0
Reporter: Werner Daehn


Please bare with me. Initially this thought sounds stupid but it has its merits.

 

How do you build a distributed producer at the moment? You use Kafka Connect 
which in turn requires a cluster that tells which instance is producing what 
partitions.

On the consumer side it is different. There Kafka itself does the data 
distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
data for one partition. With 5 consumers, each will get data from two 
partitions. And if there is only a single consumer active, it gets all data. 
All is managed by Kafka, all you have to do is start as many consumers as you 
want.

 

I'd like to suggest something similar for the producers. A producer would tell 
Kafka that its source has 10 partitions. The Kafka server then responds with a 
list of partitions this instance shall be responsible for. If it is the only 
producer, the response would be all 10 partitions. If it is the second instance 
starting up, the first instance would get the information it should produce 
data for partition 1-5 and the new one for partition 6-10. If the producer 
fails to respond with an alive packet, a rebalance does happen, informing the 
active producer to take more load and the dead producer will get an error when 
sending data again.

For restart, the producer rebalance has to send the starting point where to 
start producing the data onwards from as well, of course. Would be best if this 
is a user generated pointer and not the topic offset. Then it can be e.g. the 
database system change number, a database transaction id or something similar.

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)