[jira] [Created] (KAFKA-7715) Connect should have a parameter to disable WADL output for OPTIONS method

2018-12-07 Thread Oleksandr Diachenko (JIRA)
Oleksandr Diachenko created KAFKA-7715:
--

 Summary: Connect should have a parameter to disable WADL output 
for OPTIONS method
 Key: KAFKA-7715
 URL: https://issues.apache.org/jira/browse/KAFKA-7715
 Project: Kafka
  Issue Type: Improvement
  Components: config, security
Affects Versions: 2.1.0
Reporter: Oleksandr Diachenko
 Fix For: 2.1.1


Currently, Connect REST API exposes WADL output on OPTIONS method:
{code:bash}
curl -i -X OPTIONS http://localhost:8083/connectors
HTTP/1.1 200 OK
Date: Fri, 07 Dec 2018 22:51:53 GMT
Content-Type: application/vnd.sun.wadl+xml
Allow: HEAD,POST,GET,OPTIONS
Last-Modified: Fri, 07 Dec 2018 14:51:53 PST
Content-Length: 1331
Server: Jetty(9.4.12.v20180830)


http://wadl.dev.java.net/2009/02";>
http://jersey.java.net/"; jersey:generatedBy="Jersey: 2.27 
2018-04-10 07:34:57"/>

http://localhost:8083/application.wadl/xsd0.xsd";>



http://localhost:8083/";>



http://www.w3.org/2001/XMLSchema"; name="forward" style="query" 
type="xs:boolean"/>








http://www.w3.org/2001/XMLSchema"; name="forward" style="query" 
type="xs:boolean"/>








{code}

This can be a potential vulnerability, so it makes sense to have a 
configuration parameter, which disables WADL output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5692) Refactor PreferredReplicaLeaderElectionCommand to use AdminClient

2018-12-07 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713395#comment-16713395
 ] 

Jun Rao commented on KAFKA-5692:


[~tombentley], thanks for contributing the PR. Would be still be interested in 
finishing up the PR? If not, perhaps we can unassign the Jira and see if 
someone else wants to finish this up.

> Refactor PreferredReplicaLeaderElectionCommand to use AdminClient
> -
>
> Key: KAFKA-5692
> URL: https://issues.apache.org/jira/browse/KAFKA-5692
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: kip, patch-available
> Fix For: 2.2.0
>
>
> The PreferredReplicaLeaderElectionCommand currently uses a direct connection 
> to zookeeper. The zookeeper dependency should be deprecated and an 
> AdminClient API created to be used instead. 
> This change will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-12-07 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713370#comment-16713370
 ] 

Matthias J. Sax commented on KAFKA-7523:


I added you to the list on contributors and assigned the ticket to you. You can 
now also self-assign tickets.

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Assignee: Paul Whalen
>Priority: Minor
>  Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-12-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7523:
--

Assignee: Paul Whalen

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Assignee: Paul Whalen
>Priority: Minor
>  Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7714) Scala streams API should use Options in left and outer joins

2018-12-07 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713364#comment-16713364
 ] 

Matthias J. Sax commented on KAFKA-7714:


You can find information about the KIP process in the wiki: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Scala streams API should use Options in left and outer joins
> 
>
> Key: KAFKA-7714
> URL: https://issues.apache.org/jira/browse/KAFKA-7714
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Charles Crain
>Priority: Minor
>  Labels: needs-kip
>
> The Scala streams DSL for left and outer joins should use Options instead of 
> possibly-null parameters to the joiner functions. As currently written, 
> implementers of joiners for Kafka Streams must account for the right side of 
> a left join (or either side of an outer join) being null, which is not 
> idiomatic Scala.
> Note to reviewer: I would like to contribute this change myself. What is the 
> policy on breaking API changes? Would it be acceptable to change the 
> left/outer join APIs to accept options? Or would I need to implement parallel 
> versions (naming suggestions if so?).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7714) Scala streams API should use Options in left and outer joins

2018-12-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7714:
---
Labels: needs-kip  (was: )

> Scala streams API should use Options in left and outer joins
> 
>
> Key: KAFKA-7714
> URL: https://issues.apache.org/jira/browse/KAFKA-7714
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Charles Crain
>Priority: Minor
>  Labels: needs-kip
>
> The Scala streams DSL for left and outer joins should use Options instead of 
> possibly-null parameters to the joiner functions. As currently written, 
> implementers of joiners for Kafka Streams must account for the right side of 
> a left join (or either side of an outer join) being null, which is not 
> idiomatic Scala.
> Note to reviewer: I would like to contribute this change myself. What is the 
> policy on breaking API changes? Would it be acceptable to change the 
> left/outer join APIs to accept options? Or would I need to implement parallel 
> versions (naming suggestions if so?).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7714) Scala streams API should use Options in left and outer joins

2018-12-07 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713305#comment-16713305
 ] 

John Roesler commented on KAFKA-7714:
-

Hi [~ccrain_kuka],

Thanks for your interest in improving the API!

I actually kicked around the idea of proposing a similar change for the Java 
API, partly for the same reason that Scala introduced `Option`, and partly 
because it gets us out of a semantic trap regarding the interpretation of 
records with `null` values.

It will probably be a minefield, but I wonder if we should at least consider 
making the change in both the Java and Scala APIs...

 

To answer your question about breaking changes... We try really hard to avoid 
it.

We've learned that sometimes, the syntax of Scala makes it impossible to make 
some changes without breaking source compatibility (notably implicits), but I 
think the change you're proposing would be possible to do "safely".

I think we'd introduce a new method with a default implementation calling back 
to the old method, and then mark the old method deprecated. To avoid forcing 
people to implement the deprecated method, we can also add a default 
implementation to it that throws an exception. It's a bummer, because the 
interface doesn't advertise that you have to implement a method, though, so I'm 
open to suggestions...

 

To actually propose a change, you have to go through the KIP process. I'm happy 
to help guide you through the process if you'd like.

-John

> Scala streams API should use Options in left and outer joins
> 
>
> Key: KAFKA-7714
> URL: https://issues.apache.org/jira/browse/KAFKA-7714
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Charles Crain
>Priority: Minor
>
> The Scala streams DSL for left and outer joins should use Options instead of 
> possibly-null parameters to the joiner functions. As currently written, 
> implementers of joiners for Kafka Streams must account for the right side of 
> a left join (or either side of an outer join) being null, which is not 
> idiomatic Scala.
> Note to reviewer: I would like to contribute this change myself. What is the 
> policy on breaking API changes? Would it be acceptable to change the 
> left/outer join APIs to accept options? Or would I need to implement parallel 
> versions (naming suggestions if so?).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7714) Scala streams API should use Options in left and outer joins

2018-12-07 Thread Charles Crain (JIRA)
Charles Crain created KAFKA-7714:


 Summary: Scala streams API should use Options in left and outer 
joins
 Key: KAFKA-7714
 URL: https://issues.apache.org/jira/browse/KAFKA-7714
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.1.0, 2.0.1
Reporter: Charles Crain


The Scala streams DSL for left and outer joins should use Options instead of 
possibly-null parameters to the joiner functions. As currently written, 
implementers of joiners for Kafka Streams must account for the right side of a 
left join (or either side of an outer join) being null, which is not idiomatic 
Scala.

Note to reviewer: I would like to contribute this change myself. What is the 
policy on breaking API changes? Would it be acceptable to change the left/outer 
join APIs to accept options? Or would I need to implement parallel versions 
(naming suggestions if so?).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-12-07 Thread Paul Whalen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713222#comment-16713222
 ] 

Paul Whalen commented on KAFKA-7523:


That's effectively what we've been doing and it's working just fine.  My 
thinking is that a restructuring of (or addition to) the API could make it a 
bit easier for the user to use a low-level Transformer or Processor without 
having to be aware of any state stores it needs to do its job.  This is one of 
the great parts about the DSL I think: it's declarative in a way that allows 
the developer not to think about the state under the hood.  I think it would be 
great to bring some of that ease to users' custom Transformers and Processors.  
I've requested permission on the mailing list to write up a KIP on this.

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Priority: Minor
>  Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-07 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated KAFKA-7703:

Priority: Major  (was: Blocker)

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Viktor Somogyi
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-07 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated KAFKA-7703:

Priority: Blocker  (was: Major)

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Viktor Somogyi
>Priority: Blocker
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2018-12-07 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713043#comment-16713043
 ] 

Viktor Somogyi commented on KAFKA-6794:
---

Also created an early pull request for matching up if the algorithm we figured 
is in line with the goals of this jira.

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2018-12-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712987#comment-16712987
 ] 

ASF GitHub Bot commented on KAFKA-6794:
---

viktorsomogyi opened a new pull request #6011: [WIP] KAFKA-6794: Incremental 
partition reassignment
URL: https://github.com/apache/kafka/pull/6011
 
 
   This pull request replaces the current partition reassignment strategy with 
an incremental one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7713) producer io-wait-ratio > 1

2018-12-07 Thread dan norwood (JIRA)
dan norwood created KAFKA-7713:
--

 Summary: producer io-wait-ratio > 1
 Key: KAFKA-7713
 URL: https://issues.apache.org/jira/browse/KAFKA-7713
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
Reporter: dan norwood


i am running a test on a streams application and gathering jmx measurements to 
determine what is causing some lag. using `kafka.tools.JmxTool` i was gathering 
the following attributes `'io-ratio', 'io-wait-ratio', 'select-rate', 
'batch-size-avg', 'compression-rate-avg', 'record-size-avg', 
'records-per-request-avg'` on my streams instances producers. i noticed that i 
was getting `io-wait-ratio > 1`, but according to docs it is "The fraction of 
time the I/O thread spent waiting." 

 

some example lines from jmxtool
|StreamThread-8-producer:batch-size-avg|StreamThread-8-producer:compression-rate-avg|StreamThread-8-producer:io-ratio|*StreamThread-8-producer:io-wait-ratio*|StreamThread-8-producer:record-size-avg|StreamThread-8-producer:records-per-request-avg|StreamThread-8-producer:select-rate|
|662.2613636|0.3425814926|1.01E-04|*1.178371974*|172.2045455|38.7167|3.855527588|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2018-12-07 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712789#comment-16712789
 ] 

Stanislav Kozlovski edited comment on KAFKA-7711 at 12/7/18 12:39 PM:
--

This sounds good to me. Thanks for opening the JIRA, [~kdu] 

Since this is a public interface change, it will require opening a KIP and 
discussion around it.
[~kdu] , are you up to driving this change?


was (Author: enether):
This sounds good to me. Thanks for opening the JIRA, [~kdu]

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kun du
>Priority: Minor
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer

2018-12-07 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712789#comment-16712789
 ] 

Stanislav Kozlovski commented on KAFKA-7711:


This sounds good to me. Thanks for opening the JIRA, [~kdu]

> Add a bounded flush()  API to Kafka Producer
> 
>
> Key: KAFKA-7711
> URL: https://issues.apache.org/jira/browse/KAFKA-7711
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kun du
>Priority: Minor
>
> Currently the call to Producer.flush() can be hang there for indeterminate 
> time.
> It is a good idea to add a bounded flush() API and timeout if producer is 
> unable to flush all the batch records in a limited time. In this way the 
> caller of flush() has a chance to decide what to do next instead of just wait 
> forever.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector

2018-12-07 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7712:
-

 Summary: Handle exceptions from immediately connected channels in 
Selector
 Key: KAFKA-7712
 URL: https://issues.apache.org/jira/browse/KAFKA-7712
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0


We try to handle all possible exceptions in Selector to ensure that channels 
are always closed and their states kept consistent. For immediately connected 
channels, we should ensure that any exception during connection results in the 
channel being closed properly and removed from all maps. This is a very 
unlikely scenario, but we do already handle the exception. We should clean up 
properly in the catch block.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-12-07 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712734#comment-16712734
 ] 

Patrik Kleindl commented on KAFKA-7657:
---

[~guozhang] Thanks for the fast analysis. It is not a major problem for us 
currently, so a fix in an upcoming 2.0.x or 2.1 would be sufficient.

As you guessed correctly the retries is set to 10 and I will increase it to 
hopefully prevent such a problem.

> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Major
>  Labels: bug
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports hardly any offset lag in any 
> of the partitions assigned to the REBALANCING consumers. Each partition seems 
> to be processing an equal amount of records too.
> Inspecting the state.dir on disk, it looks like the RocksDB state has been 
> built and hovers at the expected size on disk.
> This problem has persisted for us after we rebuilt our Kafka cluster and 
> reset topics + consumer groups in our dev environment.
> There is nothing in the logs (with level set to DEBUG) in both the broker or 
> the application that suggests something exceptional has happened causing the 
> application to be stuck REBALANCING.
> We are also running multiple streaming applications where this problem does 
> not exist.
> Two differences between this application and our other streaming applications 
> are:
>  * We have processing.guarantee set to exactly_once
>  * We are using a ValueTransformer which fetches from and puts data on a 
> windowed state store
> The REBALANCING state is returned from both polling the state method of our 
> KafkaStreams instance, and our custom metric which is derived from some logic 
> in a KafkaStreams.StateListener class attached via the setStateListener 
> method.
>  
> While I have provided a bit of context, before I reply with some reproducible 
> code - is there a simple way in which I can determine that my streams 
> application is in a RUNNING state without relying on the same mechanisms as 
> used above?
> Further, given that it seems like my application is actually running - could 
> this perhaps be a bug to do with how the stream state is being reported (in 
> the context of a transactional stream using the processor API)?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2018-12-07 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712629#comment-16712629
 ] 

Viktor Somogyi commented on KAFKA-6794:
---

I think this change doesn't need a KIP for now, so I'm collected the algorithm 
and some examples here, [~hachikuji] please have a look at it.
h2. Calculating A Reassignment Step

For calculating a reassignment step, always the final target replica (FTR) set 
and the current replica (CR) set is used.
 # Calculate the replicas to be dropped (DR):
 # Calculate n = size(FTR) - size(CR)
 ## Filter those replicas from CR which are not in FTR, this is the excess 
replica (ER) set
 ## Sort the ER set in an order where the leader is the last (this will ensure 
that it will be selected only when needed).
 ## Take the first n replicas of ER, that will be the set of dropped replicas
 # Calculate the new replica (NR) to be added by selecting the first replica 
from FTR that is not in CR
 # Create the target replica (TR) set: CR + NR - DR
 # If this is the last step, then order the replicas as specified by FTR. This 
means that the last step is always equals to FTR

h2. Performing A Reassignment Step
 # Wait until CR is entirely in ISR. This will make sure that we're starting 
off with a solid base for reassignment.
 # Calculate the next reassignment step as described above based on the 
reassignment context.
 # Wait until all brokers in the target replicas (TR) of the reassignment step 
are alive. This will make sure that reassignment starts only when the target 
brokers can perform the actual reassignment step.
 # If we have new replicas in ISR from the previous step, change the states' of 
those to OnlineReplica
 # Update CR in Zookeeper with TR: with this the DR set will be drop and NR set 
will be added.
 # Send LeaderAndIsr request to all replicas in CR + NR so they would be 
notified of the Zookeeper events.
 # Start new replicas in NR by moving them to NewReplica state.
 # Set CR to TR in memory.
 # Send LeaderAndIsr request with a potential new leader (if current leader not 
in TR) and a new CR (using TR) and same ISR to every broker in TR
 # Replicas in DR -> Offline (force those replicas out of ISR)
 # Replicas in DR -> NonExistentReplica (force those replicas to be deleted)
 # Update the /admin/reassign_partitions path in ZK to remove this partition.
 # After electing leader, the replicas and ISR information changes, so resend 
the update metadata request to every broker

h2. Example

The following code block shows how a transition happens from (0, 1, 2) into (3, 
4, 5) where the initial leader is 0.
{noformat}
 (0, 1, 2) // starting assignment
 |
(0, 1, 2, 3)   // +3
 |
(0, 2, 3, 4)   // -1 +4
 |
(0, 3, 4, 5)   // -2 +5
 |
 (3, 4, 5) // -0, new leader (3) is elected, requested order is matched, 
reassignment finished
{noformat}
Let's take a closer look at the third step above:
{noformat}
FTR = (3, 4, 5)
CR = (0, 1, 2, 3)
 
n = size(FTR) - size(CR)  // 1
ER = CR - FTR // (0, 1, 2)
ER = order(ER)// (1, 2, 0)
DR = takeFirst(ER, n) // (1)
 
NR = first(FTR - CR)  // 4
TR = CR + NR - DR // (0, 2, 3, 4)
{noformat}

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-07 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-7694:
-

Assignee: Satish Duggana

>  Support ZooKeeper based master/secret key management for delegation tokens
> ---
>
> Key: KAFKA-7694
> URL: https://issues.apache.org/jira/browse/KAFKA-7694
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Satish Duggana
>Priority: Major
>
> Master/secret key is used to generate and verify delegation tokens. 
> currently, master key/secret is stored as plain text in server.properties 
> config file. Same key must be configured across all the brokers. We require a 
> re-deployment when the secret needs to be rotated.
> This JIRA is to explore and implement a ZooKeeper based master/secret key 
> management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)