[jira] [Created] (KAFKA-8814) Consumer benchmark test for paused partitions
Sean Glover created KAFKA-8814: -- Summary: Consumer benchmark test for paused partitions Key: KAFKA-8814 URL: https://issues.apache.org/jira/browse/KAFKA-8814 Project: Kafka Issue Type: New Feature Components: consumer, system tests, tools Reporter: Sean Glover Assignee: Sean Glover A new performance benchmark and corresponding {{ConsumerPerformance}} tools addition to support the paused partition performance improvement implemented in KAFKA-7548. Before the fix, when the user would poll for completed fetched records for partitions that were paused, the consumer would throw away the data because it no longer fetchable. If the partition is resumed then the data would have to be fetched over again. The fix will cache completed fetched records for paused partitions indefinitely so they can be potentially be returned once the partition is resumed. In the Jira issue KAFKA-7548 there are several informal test results shown based on a number of different paused partition scenarios, but it was suggested that a test in the benchmarks testsuite would be ideal to demonstrate the performance improvement. In order to the implement this benchmark we must implement a new feature in {{ConsumerPerformance}} used by the benchmark testsuite and the {{kafka-consumer-perf-test.sh}} bin script that will pause partitions. I added the following parameter: {code:scala} val pausedPartitionsOpt = parser.accepts("paused-partitions-percent", "The percentage [0-1] of subscribed " + "partitions to pause each poll.") .withOptionalArg() .describedAs("percent") .withValuesConvertedBy(regex("^0(\\.\\d+)?|1\\.0$")) // matches [0-1] with decimals .ofType(classOf[Float]) .defaultsTo(0F) {code} This allows the user to specify a percentage (represented a floating point value from {{0..1}}) of partitions to pause each poll interval. When the value is greater than {{0}} then we will take the next _n_ partitions to pause. I ran the test on `trunk` and rebased onto the `2.3.0` tag for the following test summaries of {{kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput}}. The test will rotate through pausing {{80%}} of assigned partitions (5/6) each poll interval. I ran this on my laptop. {{trunk}} ({{aa4ba8eee8e6f52a9d80a98fb2530b5bcc1b9a11}}) {code} SESSION REPORT (ALL TESTS) ducktape version: 0.7.5 session_id: 2019-08-18--010 run time: 2 minutes 29.104 seconds tests run:1 passed: 1 failed: 0 ignored: 0 test_id: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.paused_partitions_percent=0.8 status: PASS run time: 2 minutes 29.048 seconds {"records_per_sec": 450207.0953, "mb_per_sec": 42.9351} {code} {{2.3.0}} {code} SESSION REPORT (ALL TESTS) ducktape version: 0.7.5 session_id: 2019-08-18--011 run time: 2 minutes 41.228 seconds tests run:1 passed: 1 failed: 0 ignored: 0 test_id: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.paused_partitions_percent=0.8 status: PASS run time: 2 minutes 41.168 seconds {"records_per_sec": 246574.6024, "mb_per_sec": 23.5152} {code} The increase in record and data throughput is significant. Based on other consumer fetch metrics there are also improvements to fetch rate. Depending on how often partitions are paused and resumed it's possible to save a lot of data transfer between the consumer and broker as well. Please see the pull request for the associated changes. I was unsure if I needed to create a KIP because while technically I added a new public api to the {{ConsumerPerformance}} tool, it was only to enable this benchmark to run. If you feel that a KIP is necessary I'll create one. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8813) Race condition when creating topics and changing their configuration
Gwen Shapira created KAFKA-8813: --- Summary: Race condition when creating topics and changing their configuration Key: KAFKA-8813 URL: https://issues.apache.org/jira/browse/KAFKA-8813 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira In Partition.createLog we do: {{val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica)}} Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8755) Stand-by Task of an Optimized Source Table Does Not Write Anything to its State Store
[ https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1691#comment-1691 ] Bruno Cadonna commented on KAFKA-8755: -- {quote}So it seems that even if we get the checkpoint offset fixed in the restore consumer that we're going to have to do a lot of catch up on failover that we would not have had to do in the non optimized case. Is this expected behavior?{quote} The catchup on failover in the optimized and not optimized case should be the same, because the data read until the last written offset from the input topic in the optimized case should be equal to the data we've added to the store. That is basically what the above topology does. It reads from the input topic, writes to the store, commits (i.e., writes) the offset of the input topic. Then it repeats those actions. > Stand-by Task of an Optimized Source Table Does Not Write Anything to its > State Store > - > > Key: KAFKA-8755 > URL: https://issues.apache.org/jira/browse/KAFKA-8755 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Bruno Cadonna >Assignee: Chris Pettitt >Priority: Major > Labels: newbie > Attachments: StandbyTaskTest.java > > > With the following topology: > {code:java} > builder.table( > INPUT_TOPIC, > Consumed.with(Serdes.Integer(), Serdes.Integer()), > Materialized.>as(stateName) > ) > {code} > and with topology optimization turned on, Kafka Streams uses the input topic > {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A > stand-by task for such a topology should read from {{INPUT_TOPIC}} and should > write the records to its state store so that the streams client that runs the > stand-by task can take over the execution of the topology in case of a > failure with an up-to-date replica of the state. > Currently, the stand-by task described above reads from the input topic but > does not write the records to its state store. Thus, after a failure the > stand-by task cannot provide any up-to-date state store and the streams > client needs to construct the state from scratch before it can take over the > execution. > The described behaviour can be reproduced with the attached test. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909991#comment-16909991 ] Werner Daehn commented on KAFKA-8812: - Thanks, will do. > Rebalance Producers - yes, I mean it ;-) > > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Priority: Major > > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Werner Daehn resolved KAFKA-8812. - Resolution: Invalid Will create a KIP. > Rebalance Producers - yes, I mean it ;-) > > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Priority: Major > > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909990#comment-16909990 ] Ron Dagostino commented on KAFKA-8812: -- If you are serious about this then I think the way to move it forward is to write a [Kafka Improvement Proposal |https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] > Rebalance Producers - yes, I mean it ;-) > > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Priority: Major > > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8812) Rebalance Producers - yes, I mean it ;-)
Werner Daehn created KAFKA-8812: --- Summary: Rebalance Producers - yes, I mean it ;-) Key: KAFKA-8812 URL: https://issues.apache.org/jira/browse/KAFKA-8812 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.3.0 Reporter: Werner Daehn Please bare with me. Initially this thought sounds stupid but it has its merits. How do you build a distributed producer at the moment? You use Kafka Connect which in turn requires a cluster that tells which instance is producing what partitions. On the consumer side it is different. There Kafka itself does the data distribution. If you have 10 Kafka partitions and 10 consumers, each will get data for one partition. With 5 consumers, each will get data from two partitions. And if there is only a single consumer active, it gets all data. All is managed by Kafka, all you have to do is start as many consumers as you want. I'd like to suggest something similar for the producers. A producer would tell Kafka that its source has 10 partitions. The Kafka server then responds with a list of partitions this instance shall be responsible for. If it is the only producer, the response would be all 10 partitions. If it is the second instance starting up, the first instance would get the information it should produce data for partition 1-5 and the new one for partition 6-10. If the producer fails to respond with an alive packet, a rebalance does happen, informing the active producer to take more load and the dead producer will get an error when sending data again. For restart, the producer rebalance has to send the starting point where to start producing the data onwards from as well, of course. Would be best if this is a user generated pointer and not the topic offset. Then it can be e.g. the database system change number, a database transaction id or something similar. -- This message was sent by Atlassian JIRA (v7.6.14#76016)