[jira] [Created] (KAFKA-7715) Connect should have a parameter to disable WADL output for OPTIONS method
Oleksandr Diachenko created KAFKA-7715: -- Summary: Connect should have a parameter to disable WADL output for OPTIONS method Key: KAFKA-7715 URL: https://issues.apache.org/jira/browse/KAFKA-7715 Project: Kafka Issue Type: Improvement Components: config, security Affects Versions: 2.1.0 Reporter: Oleksandr Diachenko Fix For: 2.1.1 Currently, Connect REST API exposes WADL output on OPTIONS method: {code:bash} curl -i -X OPTIONS http://localhost:8083/connectors HTTP/1.1 200 OK Date: Fri, 07 Dec 2018 22:51:53 GMT Content-Type: application/vnd.sun.wadl+xml Allow: HEAD,POST,GET,OPTIONS Last-Modified: Fri, 07 Dec 2018 14:51:53 PST Content-Length: 1331 Server: Jetty(9.4.12.v20180830) http://wadl.dev.java.net/2009/02";> http://jersey.java.net/"; jersey:generatedBy="Jersey: 2.27 2018-04-10 07:34:57"/> http://localhost:8083/application.wadl/xsd0.xsd";> http://localhost:8083/";> http://www.w3.org/2001/XMLSchema"; name="forward" style="query" type="xs:boolean"/> http://www.w3.org/2001/XMLSchema"; name="forward" style="query" type="xs:boolean"/> {code} This can be a potential vulnerability, so it makes sense to have a configuration parameter, which disables WADL output. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713395#comment-16713395 ] Jun Rao commented on KAFKA-5692: [~tombentley], thanks for contributing the PR. Would be still be interested in finishing up the PR? If not, perhaps we can unassign the Jira and see if someone else wants to finish this up. > Refactor PreferredReplicaLeaderElectionCommand to use AdminClient > - > > Key: KAFKA-5692 > URL: https://issues.apache.org/jira/browse/KAFKA-5692 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: kip, patch-available > Fix For: 2.2.0 > > > The PreferredReplicaLeaderElectionCommand currently uses a direct connection > to zookeeper. The zookeeper dependency should be deprecated and an > AdminClient API created to be used instead. > This change will require a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements
[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713370#comment-16713370 ] Matthias J. Sax commented on KAFKA-7523: I added you to the list on contributors and assigned the ticket to you. You can now also self-assign tickets. > TransformerSupplier/ProcessorSupplier enhancements > -- > > Key: KAFKA-7523 > URL: https://issues.apache.org/jira/browse/KAFKA-7523 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paul Whalen >Assignee: Paul Whalen >Priority: Minor > Labels: needs-kip > > I have found that when writing "low level" {{Processors}} and > {{Transformers}} that are stateful, often I want these processors to "own" > one or more state stores, the details of which are not important to the > business logic of the application. However, when incorporating these into > the topologies defined by the high level API, using {{KStream::transform}} or > {{KStream::process}}, I'm forced to specify the stores so the topology is > wired up correctly. This creates an unfortunate pattern where the > {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the > pattern I've been following) holds the information about the name of the > state stores, must be defined above the "high level" "fluent API"-style > pipeline, which makes it hard to understand the business logic data flow. > > What I currently have to do: > {code:java} > TransformerSupplier transformerSupplier = new > TransformerSupplierWithState(topology, val -> businessLogic(val)); > builder.stream("in.topic") > .transform(transformerSupplier, transformerSupplier.stateStoreNames()) > .to("out.topic");{code} > I have to both define the {{TransformerSupplier}} above the "fluent block", > and pass the topology in so I can call {{topology.addStateStore()}} inside > the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what > the state store names are for that point in the topology. The lambda {{val -> > businessLogic(val)}} is really what I want to see in-line because that's the > crux of what is happening, along with the name of some factory method > describing what the transformer is doing for me internally. This issue is > obviously exacerbated when the "fluent block" is much longer than this > example - It gets worse the farther away {{val -> businessLogic(val)}} is > from {{KStream::transform}}. > > An improvement: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(topology, val -> > businessLogic(val))) > .to("out.topic");{code} > Which implies the existence of a {{KStream::transform}} that takes a single > argument that adheres to this interface: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > String[] stateStoreNames(); > }{code} > Or better yet, I wouldn't have to pass in the topology, the caller of > {{TransformerSupplierWithState}} could also handle the job of "adding" its > state stores to the topology: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > Map stateStores(); > }{code} > Which would enable my ideal: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(val -> businessLogic(val))) > .to("out.topic");{code} > I think this would be a huge improvement in the usability of low-level > processors with the high-level DSL. > Please let me know if I'm missing something as to why this cannot or should > not happen, or if there is a better forum for this suggestion (presumably it > would require a KIP?). I'd be happy to build it as well if there is a chance > of it being merged, it doesn't seem like a huge challenge to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements
[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-7523: -- Assignee: Paul Whalen > TransformerSupplier/ProcessorSupplier enhancements > -- > > Key: KAFKA-7523 > URL: https://issues.apache.org/jira/browse/KAFKA-7523 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paul Whalen >Assignee: Paul Whalen >Priority: Minor > Labels: needs-kip > > I have found that when writing "low level" {{Processors}} and > {{Transformers}} that are stateful, often I want these processors to "own" > one or more state stores, the details of which are not important to the > business logic of the application. However, when incorporating these into > the topologies defined by the high level API, using {{KStream::transform}} or > {{KStream::process}}, I'm forced to specify the stores so the topology is > wired up correctly. This creates an unfortunate pattern where the > {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the > pattern I've been following) holds the information about the name of the > state stores, must be defined above the "high level" "fluent API"-style > pipeline, which makes it hard to understand the business logic data flow. > > What I currently have to do: > {code:java} > TransformerSupplier transformerSupplier = new > TransformerSupplierWithState(topology, val -> businessLogic(val)); > builder.stream("in.topic") > .transform(transformerSupplier, transformerSupplier.stateStoreNames()) > .to("out.topic");{code} > I have to both define the {{TransformerSupplier}} above the "fluent block", > and pass the topology in so I can call {{topology.addStateStore()}} inside > the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what > the state store names are for that point in the topology. The lambda {{val -> > businessLogic(val)}} is really what I want to see in-line because that's the > crux of what is happening, along with the name of some factory method > describing what the transformer is doing for me internally. This issue is > obviously exacerbated when the "fluent block" is much longer than this > example - It gets worse the farther away {{val -> businessLogic(val)}} is > from {{KStream::transform}}. > > An improvement: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(topology, val -> > businessLogic(val))) > .to("out.topic");{code} > Which implies the existence of a {{KStream::transform}} that takes a single > argument that adheres to this interface: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > String[] stateStoreNames(); > }{code} > Or better yet, I wouldn't have to pass in the topology, the caller of > {{TransformerSupplierWithState}} could also handle the job of "adding" its > state stores to the topology: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > Map stateStores(); > }{code} > Which would enable my ideal: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(val -> businessLogic(val))) > .to("out.topic");{code} > I think this would be a huge improvement in the usability of low-level > processors with the high-level DSL. > Please let me know if I'm missing something as to why this cannot or should > not happen, or if there is a better forum for this suggestion (presumably it > would require a KIP?). I'd be happy to build it as well if there is a chance > of it being merged, it doesn't seem like a huge challenge to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7714) Scala streams API should use Options in left and outer joins
[ https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713364#comment-16713364 ] Matthias J. Sax commented on KAFKA-7714: You can find information about the KIP process in the wiki: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > Scala streams API should use Options in left and outer joins > > > Key: KAFKA-7714 > URL: https://issues.apache.org/jira/browse/KAFKA-7714 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.1, 2.1.0 >Reporter: Charles Crain >Priority: Minor > Labels: needs-kip > > The Scala streams DSL for left and outer joins should use Options instead of > possibly-null parameters to the joiner functions. As currently written, > implementers of joiners for Kafka Streams must account for the right side of > a left join (or either side of an outer join) being null, which is not > idiomatic Scala. > Note to reviewer: I would like to contribute this change myself. What is the > policy on breaking API changes? Would it be acceptable to change the > left/outer join APIs to accept options? Or would I need to implement parallel > versions (naming suggestions if so?). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7714) Scala streams API should use Options in left and outer joins
[ https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7714: --- Labels: needs-kip (was: ) > Scala streams API should use Options in left and outer joins > > > Key: KAFKA-7714 > URL: https://issues.apache.org/jira/browse/KAFKA-7714 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.1, 2.1.0 >Reporter: Charles Crain >Priority: Minor > Labels: needs-kip > > The Scala streams DSL for left and outer joins should use Options instead of > possibly-null parameters to the joiner functions. As currently written, > implementers of joiners for Kafka Streams must account for the right side of > a left join (or either side of an outer join) being null, which is not > idiomatic Scala. > Note to reviewer: I would like to contribute this change myself. What is the > policy on breaking API changes? Would it be acceptable to change the > left/outer join APIs to accept options? Or would I need to implement parallel > versions (naming suggestions if so?). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7714) Scala streams API should use Options in left and outer joins
[ https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713305#comment-16713305 ] John Roesler commented on KAFKA-7714: - Hi [~ccrain_kuka], Thanks for your interest in improving the API! I actually kicked around the idea of proposing a similar change for the Java API, partly for the same reason that Scala introduced `Option`, and partly because it gets us out of a semantic trap regarding the interpretation of records with `null` values. It will probably be a minefield, but I wonder if we should at least consider making the change in both the Java and Scala APIs... To answer your question about breaking changes... We try really hard to avoid it. We've learned that sometimes, the syntax of Scala makes it impossible to make some changes without breaking source compatibility (notably implicits), but I think the change you're proposing would be possible to do "safely". I think we'd introduce a new method with a default implementation calling back to the old method, and then mark the old method deprecated. To avoid forcing people to implement the deprecated method, we can also add a default implementation to it that throws an exception. It's a bummer, because the interface doesn't advertise that you have to implement a method, though, so I'm open to suggestions... To actually propose a change, you have to go through the KIP process. I'm happy to help guide you through the process if you'd like. -John > Scala streams API should use Options in left and outer joins > > > Key: KAFKA-7714 > URL: https://issues.apache.org/jira/browse/KAFKA-7714 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.1, 2.1.0 >Reporter: Charles Crain >Priority: Minor > > The Scala streams DSL for left and outer joins should use Options instead of > possibly-null parameters to the joiner functions. As currently written, > implementers of joiners for Kafka Streams must account for the right side of > a left join (or either side of an outer join) being null, which is not > idiomatic Scala. > Note to reviewer: I would like to contribute this change myself. What is the > policy on breaking API changes? Would it be acceptable to change the > left/outer join APIs to accept options? Or would I need to implement parallel > versions (naming suggestions if so?). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7714) Scala streams API should use Options in left and outer joins
Charles Crain created KAFKA-7714: Summary: Scala streams API should use Options in left and outer joins Key: KAFKA-7714 URL: https://issues.apache.org/jira/browse/KAFKA-7714 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.1.0, 2.0.1 Reporter: Charles Crain The Scala streams DSL for left and outer joins should use Options instead of possibly-null parameters to the joiner functions. As currently written, implementers of joiners for Kafka Streams must account for the right side of a left join (or either side of an outer join) being null, which is not idiomatic Scala. Note to reviewer: I would like to contribute this change myself. What is the policy on breaking API changes? Would it be acceptable to change the left/outer join APIs to accept options? Or would I need to implement parallel versions (naming suggestions if so?). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements
[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713222#comment-16713222 ] Paul Whalen commented on KAFKA-7523: That's effectively what we've been doing and it's working just fine. My thinking is that a restructuring of (or addition to) the API could make it a bit easier for the user to use a low-level Transformer or Processor without having to be aware of any state stores it needs to do its job. This is one of the great parts about the DSL I think: it's declarative in a way that allows the developer not to think about the state under the hood. I think it would be great to bring some of that ease to users' custom Transformers and Processors. I've requested permission on the mailing list to write up a KIP on this. > TransformerSupplier/ProcessorSupplier enhancements > -- > > Key: KAFKA-7523 > URL: https://issues.apache.org/jira/browse/KAFKA-7523 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paul Whalen >Priority: Minor > Labels: needs-kip > > I have found that when writing "low level" {{Processors}} and > {{Transformers}} that are stateful, often I want these processors to "own" > one or more state stores, the details of which are not important to the > business logic of the application. However, when incorporating these into > the topologies defined by the high level API, using {{KStream::transform}} or > {{KStream::process}}, I'm forced to specify the stores so the topology is > wired up correctly. This creates an unfortunate pattern where the > {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the > pattern I've been following) holds the information about the name of the > state stores, must be defined above the "high level" "fluent API"-style > pipeline, which makes it hard to understand the business logic data flow. > > What I currently have to do: > {code:java} > TransformerSupplier transformerSupplier = new > TransformerSupplierWithState(topology, val -> businessLogic(val)); > builder.stream("in.topic") > .transform(transformerSupplier, transformerSupplier.stateStoreNames()) > .to("out.topic");{code} > I have to both define the {{TransformerSupplier}} above the "fluent block", > and pass the topology in so I can call {{topology.addStateStore()}} inside > the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what > the state store names are for that point in the topology. The lambda {{val -> > businessLogic(val)}} is really what I want to see in-line because that's the > crux of what is happening, along with the name of some factory method > describing what the transformer is doing for me internally. This issue is > obviously exacerbated when the "fluent block" is much longer than this > example - It gets worse the farther away {{val -> businessLogic(val)}} is > from {{KStream::transform}}. > > An improvement: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(topology, val -> > businessLogic(val))) > .to("out.topic");{code} > Which implies the existence of a {{KStream::transform}} that takes a single > argument that adheres to this interface: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > String[] stateStoreNames(); > }{code} > Or better yet, I wouldn't have to pass in the topology, the caller of > {{TransformerSupplierWithState}} could also handle the job of "adding" its > state stores to the topology: > {code:java} > interface TransformerSupplierWithState { > Transformer get(); > Map stateStores(); > }{code} > Which would enable my ideal: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(val -> businessLogic(val))) > .to("out.topic");{code} > I think this would be a huge improvement in the usability of low-level > processors with the high-level DSL. > Please let me know if I'm missing something as to why this cannot or should > not happen, or if there is a better forum for this suggestion (presumably it > would require a KIP?). I'd be happy to build it as well if there is a chance > of it being merged, it doesn't seem like a huge challenge to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
[ https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated KAFKA-7703: Priority: Major (was: Blocker) > KafkaConsumer.position may return a wrong offset after "seekToEnd" is called > > > Key: KAFKA-7703 > URL: https://issues.apache.org/jira/browse/KAFKA-7703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Viktor Somogyi >Priority: Major > > After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong > offset set by another reset request. > Here is a reproducer: > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246 > In this reproducer, "poll(0)" will send an "earliest" request in background. > However, after "seekToEnd" is called, due to a race condition in > "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen > between the check > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585 > and the seek > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605), > "KafkaConsumer.position" may return an "earliest" offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
[ https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated KAFKA-7703: Priority: Blocker (was: Major) > KafkaConsumer.position may return a wrong offset after "seekToEnd" is called > > > Key: KAFKA-7703 > URL: https://issues.apache.org/jira/browse/KAFKA-7703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Viktor Somogyi >Priority: Blocker > > After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong > offset set by another reset request. > Here is a reproducer: > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246 > In this reproducer, "poll(0)" will send an "earliest" request in background. > However, after "seekToEnd" is called, due to a race condition in > "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen > between the check > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585 > and the seek > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605), > "KafkaConsumer.position" may return an "earliest" offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713043#comment-16713043 ] Viktor Somogyi commented on KAFKA-6794: --- Also created an early pull request for matching up if the algorithm we figured is in line with the goals of this jira. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712987#comment-16712987 ] ASF GitHub Bot commented on KAFKA-6794: --- viktorsomogyi opened a new pull request #6011: [WIP] KAFKA-6794: Incremental partition reassignment URL: https://github.com/apache/kafka/pull/6011 This pull request replaces the current partition reassignment strategy with an incremental one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7713) producer io-wait-ratio > 1
dan norwood created KAFKA-7713: -- Summary: producer io-wait-ratio > 1 Key: KAFKA-7713 URL: https://issues.apache.org/jira/browse/KAFKA-7713 Project: Kafka Issue Type: Bug Affects Versions: 2.1.0 Reporter: dan norwood i am running a test on a streams application and gathering jmx measurements to determine what is causing some lag. using `kafka.tools.JmxTool` i was gathering the following attributes `'io-ratio', 'io-wait-ratio', 'select-rate', 'batch-size-avg', 'compression-rate-avg', 'record-size-avg', 'records-per-request-avg'` on my streams instances producers. i noticed that i was getting `io-wait-ratio > 1`, but according to docs it is "The fraction of time the I/O thread spent waiting." some example lines from jmxtool |StreamThread-8-producer:batch-size-avg|StreamThread-8-producer:compression-rate-avg|StreamThread-8-producer:io-ratio|*StreamThread-8-producer:io-wait-ratio*|StreamThread-8-producer:record-size-avg|StreamThread-8-producer:records-per-request-avg|StreamThread-8-producer:select-rate| |662.2613636|0.3425814926|1.01E-04|*1.178371974*|172.2045455|38.7167|3.855527588| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712789#comment-16712789 ] Stanislav Kozlovski edited comment on KAFKA-7711 at 12/7/18 12:39 PM: -- This sounds good to me. Thanks for opening the JIRA, [~kdu] Since this is a public interface change, it will require opening a KIP and discussion around it. [~kdu] , are you up to driving this change? was (Author: enether): This sounds good to me. Thanks for opening the JIRA, [~kdu] > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement >Reporter: kun du >Priority: Minor > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712789#comment-16712789 ] Stanislav Kozlovski commented on KAFKA-7711: This sounds good to me. Thanks for opening the JIRA, [~kdu] > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement >Reporter: kun du >Priority: Minor > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector
Rajini Sivaram created KAFKA-7712: - Summary: Handle exceptions from immediately connected channels in Selector Key: KAFKA-7712 URL: https://issues.apache.org/jira/browse/KAFKA-7712 Project: Kafka Issue Type: Bug Components: network Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 We try to handle all possible exceptions in Selector to ensure that channels are always closed and their states kept consistent. For immediately connected channels, we should ensure that any exception during connection results in the channel being closed properly and removed from all maps. This is a very unlikely scenario, but we do already handle the exception. We should clean up properly in the catch block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712734#comment-16712734 ] Patrik Kleindl commented on KAFKA-7657: --- [~guozhang] Thanks for the fast analysis. It is not a major problem for us currently, so a fix in an upcoming 2.0.x or 2.1 would be sufficient. As you guessed correctly the retries is set to 10 and I will increase it to hopefully prevent such a problem. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712629#comment-16712629 ] Viktor Somogyi commented on KAFKA-6794: --- I think this change doesn't need a KIP for now, so I'm collected the algorithm and some examples here, [~hachikuji] please have a look at it. h2. Calculating A Reassignment Step For calculating a reassignment step, always the final target replica (FTR) set and the current replica (CR) set is used. # Calculate the replicas to be dropped (DR): # Calculate n = size(FTR) - size(CR) ## Filter those replicas from CR which are not in FTR, this is the excess replica (ER) set ## Sort the ER set in an order where the leader is the last (this will ensure that it will be selected only when needed). ## Take the first n replicas of ER, that will be the set of dropped replicas # Calculate the new replica (NR) to be added by selecting the first replica from FTR that is not in CR # Create the target replica (TR) set: CR + NR - DR # If this is the last step, then order the replicas as specified by FTR. This means that the last step is always equals to FTR h2. Performing A Reassignment Step # Wait until CR is entirely in ISR. This will make sure that we're starting off with a solid base for reassignment. # Calculate the next reassignment step as described above based on the reassignment context. # Wait until all brokers in the target replicas (TR) of the reassignment step are alive. This will make sure that reassignment starts only when the target brokers can perform the actual reassignment step. # If we have new replicas in ISR from the previous step, change the states' of those to OnlineReplica # Update CR in Zookeeper with TR: with this the DR set will be drop and NR set will be added. # Send LeaderAndIsr request to all replicas in CR + NR so they would be notified of the Zookeeper events. # Start new replicas in NR by moving them to NewReplica state. # Set CR to TR in memory. # Send LeaderAndIsr request with a potential new leader (if current leader not in TR) and a new CR (using TR) and same ISR to every broker in TR # Replicas in DR -> Offline (force those replicas out of ISR) # Replicas in DR -> NonExistentReplica (force those replicas to be deleted) # Update the /admin/reassign_partitions path in ZK to remove this partition. # After electing leader, the replicas and ISR information changes, so resend the update metadata request to every broker h2. Example The following code block shows how a transition happens from (0, 1, 2) into (3, 4, 5) where the initial leader is 0. {noformat} (0, 1, 2) // starting assignment | (0, 1, 2, 3) // +3 | (0, 2, 3, 4) // -1 +4 | (0, 3, 4, 5) // -2 +5 | (3, 4, 5) // -0, new leader (3) is elected, requested order is matched, reassignment finished {noformat} Let's take a closer look at the third step above: {noformat} FTR = (3, 4, 5) CR = (0, 1, 2, 3) n = size(FTR) - size(CR) // 1 ER = CR - FTR // (0, 1, 2) ER = order(ER)// (1, 2, 0) DR = takeFirst(ER, n) // (1) NR = first(FTR - CR) // 4 TR = CR + NR - DR // (0, 2, 3, 4) {noformat} > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens
[ https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-7694: - Assignee: Satish Duggana > Support ZooKeeper based master/secret key management for delegation tokens > --- > > Key: KAFKA-7694 > URL: https://issues.apache.org/jira/browse/KAFKA-7694 > Project: Kafka > Issue Type: Sub-task >Reporter: Manikumar >Assignee: Satish Duggana >Priority: Major > > Master/secret key is used to generate and verify delegation tokens. > currently, master key/secret is stored as plain text in server.properties > config file. Same key must be configured across all the brokers. We require a > re-deployment when the secret needs to be rotated. > This JIRA is to explore and implement a ZooKeeper based master/secret key > management to automate secret key generation and expiration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)