[jira] [Created] (KAFKA-7857) Add AbstractCoordinatorConfig class to consolidate consumer coordinator configs between Consumer and Connect

2019-01-22 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7857:
--

 Summary: Add AbstractCoordinatorConfig class to consolidate 
consumer coordinator configs between Consumer and Connect
 Key: KAFKA-7857
 URL: https://issues.apache.org/jira/browse/KAFKA-7857
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen


Right now there are a lot of duplicate configuration concerning client 
coordinator shared across ConsumerConfig and DistributedConfig (connect 
config). It makes sense to extract all coordinator related configs into a 
separate config class to reduce code redundancy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7859) Replace LeaveGroup request/response with automated protocol

2019-01-22 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7859:
--

 Summary: Replace LeaveGroup request/response with automated 
protocol
 Key: KAFKA-7859
 URL: https://issues.apache.org/jira/browse/KAFKA-7859
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7858) Replace JoinGroup request/response with automated protocol

2019-01-22 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7858:
--

 Summary: Replace JoinGroup request/response with automated protocol
 Key: KAFKA-7858
 URL: https://issues.apache.org/jira/browse/KAFKA-7858
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7862) Modify JoinGroup logic to incorporate group.instance.id change

2019-01-23 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7862:
--

 Summary: Modify JoinGroup logic to incorporate group.instance.id 
change
 Key: KAFKA-7862
 URL: https://issues.apache.org/jira/browse/KAFKA-7862
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


The step one for KIP-345 join group logic change to corporate with static 
membership.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2019-01-23 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-6995.

Resolution: Won't Fix

We are planning to deprecate this config with the introduction of KIP-345. 

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7859) Replace LeaveGroup request/response with automated protocol

2019-01-31 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7859.

Resolution: Fixed

> Replace LeaveGroup request/response with automated protocol
> ---
>
> Key: KAFKA-7859
> URL: https://issues.apache.org/jira/browse/KAFKA-7859
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7816) Windowed topic should have window size as part of the metadata

2019-01-31 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7816.

Resolution: Won't Fix

Not a real use case

> Windowed topic should have window size as part of the metadata
> --
>
> Key: KAFKA-7816
> URL: https://issues.apache.org/jira/browse/KAFKA-7816
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently the Kafka window store topics require a windowed serde to properly 
> deserialize the records. One of the required config is `window.size.ms`, 
> which indicates the diff between (window.end - window.start). For space 
> efficiency, KStream only stores the windowed record with window start time, 
> because as long as the restore consumer knows size of the window, it would 
> properly derive the window end time by adding window.size.ms to window start 
> time.
> However, this makes the reuse of window topic very hard because another user 
> has to config the correct window size in order to deserialize the data. When 
> we extract the customized consumer as a template, every time new user has to 
> define their own window size. If we do wild-card matching consumer, things 
> could be even worse to work because different topics may have different 
> window size and user has to read through the application code to find that 
> info.
> To make the decoding of window topic easier, we are proposing to add a new 
> config to TopicMetadata called `windowSize` which could be used for 
> applications to properly deserialize the data without requirement to config a 
> window size. This could also make client side serde API easier. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7899) Command line tool to invalidate group metadata for clean assignment

2019-02-05 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7899:
--

 Summary: Command line tool to invalidate group metadata for clean 
assignment
 Key: KAFKA-7899
 URL: https://issues.apache.org/jira/browse/KAFKA-7899
 Project: Kafka
  Issue Type: New Feature
  Components: consumer, streams
Affects Versions: 1.1.0
Reporter: Boyang Chen
Assignee: Boyang Chen


Right now the group metadata will affect consumers under sticky assignment, 
since it persists previous topic partition assignment which affects the 
judgement of consumer leader. Specifically for KStream applications (under 
1.1), if we are scaling up the cluster, it is hard to balance the traffic since 
most tasks would still go to "previous round active" assignments, even though 
we hope them to move towards other hosts.

It would be preferable to have a tool to invalidate the group metadata stored 
on broker, so that for sticky assignment we could have a clean start.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7899) Command line tool to invalidate group metadata for clean assignment

2019-02-05 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7899.

Resolution: Invalid

It appears that the metadata should be generated by the client instead of 
broker. So basically this is invalid issue.

> Command line tool to invalidate group metadata for clean assignment
> ---
>
> Key: KAFKA-7899
> URL: https://issues.apache.org/jira/browse/KAFKA-7899
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer, streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Right now the group metadata will affect consumers under sticky assignment, 
> since it persists previous topic partition assignment which affects the 
> judgement of consumer leader. Specifically for KStream applications (under 
> 1.1), if we are scaling up the cluster, it is hard to balance the traffic 
> since most tasks would still go to "previous round active" assignments, even 
> though we hope them to move towards other hosts.
> It would be preferable to have a tool to invalidate the group metadata stored 
> on broker, so that for sticky assignment we could have a clean start.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7903) Replace OffsetCommit request/response with automated protocol

2019-02-06 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7903:
--

 Summary: Replace OffsetCommit request/response with automated 
protocol
 Key: KAFKA-7903
 URL: https://issues.apache.org/jira/browse/KAFKA-7903
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-02-24 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7995:
--

 Summary: Augment singleton protocol type to list for Kafka 
Consumer  
 Key: KAFKA-7995
 URL: https://issues.apache.org/jira/browse/KAFKA-7995
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, core
Reporter: Boyang Chen


Right now Kafka consumer protocol uses a singleton marker to distinguish Kafka 
Connect worker and normal consumer. This is not upgrade-friendly approach since 
the protocol type could potential change over time. A better approach is to 
support multiple candidacies so that the no downtime protocol type switch could 
achieve.

For example, if we are trying to upgrade a Kafka Streams application towards a 
protocol type called "stream", right now there is no way to do this without 
downtime since broker will reject changing protocol type to a different one 
unless the group is back to empty. If we allow new member to provide a list of 
protocol type instead ("consumer", "stream"), there would be no compatibility 
issue.

Alternative approach is to invent an admin API to change group's protocol type 
on runtime. However, the burden introduced on administrator is not trivial, 
since we need to guarantee the operation series to be correct, otherwise we 
will see limp-upgrade experience in the midpoint, for example while we are 
changing protocol type there was unexpected rebalance that causes old members 
join failure.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-02-24 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-7995:
--

Assignee: (was: Boyang Chen)

> Augment singleton protocol type to list for Kafka Consumer  
> 
>
> Key: KAFKA-7995
> URL: https://issues.apache.org/jira/browse/KAFKA-7995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> Right now Kafka consumer protocol uses a singleton marker to distinguish 
> Kafka Connect worker and normal consumer. This is not upgrade-friendly 
> approach since the protocol type could potential change over time. A better 
> approach is to support multiple candidacies so that the no downtime protocol 
> type switch could achieve.
> For example, if we are trying to upgrade a Kafka Streams application towards 
> a protocol type called "stream", right now there is no way to do this without 
> downtime since broker will reject changing protocol type to a different one 
> unless the group is back to empty. If we allow new member to provide a list 
> of protocol type instead ("consumer", "stream"), there would be no 
> compatibility issue.
> Alternative approach is to invent an admin API to change group's protocol 
> type on runtime. However, the burden introduced on administrator is not 
> trivial, since we need to guarantee the operation series to be correct, 
> otherwise we will see limp-upgrade experience in the midpoint, for example 
> while we are changing protocol type there was unexpected rebalance that 
> causes old members join failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-02-24 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-7995:
--

Assignee: Boyang Chen

> Augment singleton protocol type to list for Kafka Consumer  
> 
>
> Key: KAFKA-7995
> URL: https://issues.apache.org/jira/browse/KAFKA-7995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> Right now Kafka consumer protocol uses a singleton marker to distinguish 
> Kafka Connect worker and normal consumer. This is not upgrade-friendly 
> approach since the protocol type could potential change over time. A better 
> approach is to support multiple candidacies so that the no downtime protocol 
> type switch could achieve.
> For example, if we are trying to upgrade a Kafka Streams application towards 
> a protocol type called "stream", right now there is no way to do this without 
> downtime since broker will reject changing protocol type to a different one 
> unless the group is back to empty. If we allow new member to provide a list 
> of protocol type instead ("consumer", "stream"), there would be no 
> compatibility issue.
> Alternative approach is to invent an admin API to change group's protocol 
> type on runtime. However, the burden introduced on administrator is not 
> trivial, since we need to guarantee the operation series to be correct, 
> otherwise we will see limp-upgrade experience in the midpoint, for example 
> while we are changing protocol type there was unexpected rebalance that 
> causes old members join failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2017-12-22 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301556#comment-16301556
 ] 

Boyang Chen commented on KAFKA-6386:


Hi I'm new to Kafka repo and would like to contribute. Is there a way I could 
assign this ticket to myself?

> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -
>
> Key: KAFKA-6386
> URL: https://issues.apache.org/jira/browse/KAFKA-6386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> Currently, {{KafkaStreams}} constructor has overloads that take either 
> {{Properties}} or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
> object itself, the constructors accepting {{StreamsConfig}} are not useful 
> and adds only boiler plate code. Thus, we should deprecate those constructors 
> in order to remove them eventually.
> This JIRA includes a public API changes and thus requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2017-12-22 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302157#comment-16302157
 ] 

Boyang Chen commented on KAFKA-6386:


Thanks a lot Guozhang! [~guozhang]

> Deprecate KafkaStreams constructor taking StreamsConfig parameter
> -
>
> Key: KAFKA-6386
> URL: https://issues.apache.org/jira/browse/KAFKA-6386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> Currently, {{KafkaStreams}} constructor has overloads that take either 
> {{Properties}} or {{StreamsConfig}} a parameters.
> Because {{StreamsConfig}} is immutable and is created from a {{Properties}} 
> object itself, the constructors accepting {{StreamsConfig}} are not useful 
> and adds only boiler plate code. Thus, we should deprecate those constructors 
> in order to remove them eventually.
> This JIRA includes a public API changes and thus requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-26 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303641#comment-16303641
 ] 

Boyang Chen commented on KAFKA-6363:


[~h314to] Hello man, do you still want to work on this? If not, I could take 
this one over :)

> Use MockAdminClient for any unit tests that depend on AdminClient
> -
>
> Key: KAFKA-6363
> URL: https://issues.apache.org/jira/browse/KAFKA-6363
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>  Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6381) Improve KafkaStreamsTest for localThreadMetadata

2017-12-26 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-6381:
--

Assignee: Boyang Chen

> Improve KafkaStreamsTest for localThreadMetadata
> 
>
> Key: KAFKA-6381
> URL: https://issues.apache.org/jira/browse/KAFKA-6381
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, we have a rather basic test for 
> {{KafkaStreams#localThreadMetadata}} (ie, 
> {{KafkaStreamsTests#shouldReturnThreadMetadata()}}.
> In a PR discussion [~guozhang] mentioned:
> {quote}
> I'm wondering if it is better to fill into the topology with some processor 
> and then check the following active and standby tasks as non-empty would 
> cover more scenarios? In addition, we can use waitForCondition to wait for 
> the state to become RUNNING and the tasks list become non-empty.
> {quote}
> We should extend the current test method and/or add new test methods to cover 
> this and other scenario.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6381) Improve KafkaStreamsTest for localThreadMetadata

2017-12-26 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303758#comment-16303758
 ] 

Boyang Chen commented on KAFKA-6381:


I shall take a look on this one :) @mjsax

> Improve KafkaStreamsTest for localThreadMetadata
> 
>
> Key: KAFKA-6381
> URL: https://issues.apache.org/jira/browse/KAFKA-6381
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, we have a rather basic test for 
> {{KafkaStreams#localThreadMetadata}} (ie, 
> {{KafkaStreamsTests#shouldReturnThreadMetadata()}}.
> In a PR discussion [~guozhang] mentioned:
> {quote}
> I'm wondering if it is better to fill into the topology with some processor 
> and then check the following active and standby tasks as non-empty would 
> cover more scenarios? In addition, we can use waitForCondition to wait for 
> the state to become RUNNING and the tasks list become non-empty.
> {quote}
> We should extend the current test method and/or add new test methods to cover 
> this and other scenario.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6381) Improve KafkaStreamsTest for localThreadMetadata

2018-02-04 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-6381:
--

Assignee: (was: Boyang Chen)

> Improve KafkaStreamsTest for localThreadMetadata
> 
>
> Key: KAFKA-6381
> URL: https://issues.apache.org/jira/browse/KAFKA-6381
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, we have a rather basic test for 
> {{KafkaStreams#localThreadMetadata}} (ie, 
> {{KafkaStreamsTests#shouldReturnThreadMetadata()}}.
> In a PR discussion [~guozhang] mentioned:
> {quote}
> I'm wondering if it is better to fill into the topology with some processor 
> and then check the following active and standby tasks as non-empty would 
> cover more scenarios? In addition, we can use waitForCondition to wait for 
> the state to become RUNNING and the tasks list become non-empty.
> {quote}
> We should extend the current test method and/or add new test methods to cover 
> this and other scenario.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6381) Improve KafkaStreamsTest for localThreadMetadata

2018-02-04 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351866#comment-16351866
 ] 

Boyang Chen commented on KAFKA-6381:


[~mjsax] I did have a try on this one, and found that I could not come up with 
a valid mock KStreams. So leaving this one unassigned right now since I might 
not have time to tackle it. Sorry for taking so long to response!

> Improve KafkaStreamsTest for localThreadMetadata
> 
>
> Key: KAFKA-6381
> URL: https://issues.apache.org/jira/browse/KAFKA-6381
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, we have a rather basic test for 
> {{KafkaStreams#localThreadMetadata}} (ie, 
> {{KafkaStreamsTests#shouldReturnThreadMetadata()}}.
> In a PR discussion [~guozhang] mentioned:
> {quote}
> I'm wondering if it is better to fill into the topology with some processor 
> and then check the following active and standby tasks as non-empty would 
> cover more scenarios? In addition, we can use waitForCondition to wait for 
> the state to become RUNNING and the tasks list become non-empty.
> {quote}
> We should extend the current test method and/or add new test methods to cover 
> this and other scenario.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6723:
--

 Summary: Separate "max.poll.record" for restore consumer and 
common consumer
 Key: KAFKA-6723
 URL: https://issues.apache.org/jira/browse/KAFKA-6723
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


Currently, Kafka Streams use `max.poll.record` config for both restore consumer 
and normal stream consumer. In reality, they are doing different processing 
workloads, and in order to speed up the restore speed, restore consumer is 
supposed to have a higher throughput by setting `max.poll.record` higher. The 
change involved is trivial: 
[https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]

However, this is still a public API change (introducing a new config name), so 
we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416749#comment-16416749
 ] 

Boyang Chen commented on KAFKA-6723:


[~guozhang] [~liquanpei] [~mjsax] Thoughts on this?

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-28 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417697#comment-16417697
 ] 

Boyang Chen commented on KAFKA-6723:


[~mjsax]  [~guozhang] I haven't seen a concrete design for KAFKA-6657. So am I 
expecting sth like:

default.max.poll.records

restore.max.poll.records

global.max.poll.records

If that one could cover cases by creating new consumer config parameters, I 
guess this Jira sounds duplicate. Btw, I think the default value should be the 
same.

 

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-6657:
--

Assignee: Boyang Chen

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418337#comment-16418337
 ] 

Boyang Chen commented on KAFKA-6657:


[~guozhang]thanks Guozhang! I will take a look at this one.

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-31 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421559#comment-16421559
 ] 

Boyang Chen commented on KAFKA-6657:


Pull request here: https://github.com/apache/kafka/pull/4805

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6840) support windowing in ktable API

2018-04-30 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6840:
--

 Summary: support windowing in ktable API
 Key: KAFKA-6840
 URL: https://issues.apache.org/jira/browse/KAFKA-6840
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Boyang Chen
Assignee: Boyang Chen


The StreamsBuilder provides table() API to materialize a changelog topic into a 
local key-value store (KTable), which is very convenient. However, current 
underlying implementation does not support materializing one topic to a 
windowed key-value store, which in certain cases would be very useful. 

To make up the gap, we proposed a new API in StreamsBuilder that could get a 
windowed Ktable.

The table() API in StreamsBuilder looks like this:

public synchronized  KTable table(final String topic,

  final Consumed consumed,

  final Materialized> materialized) {

    Objects.requireNonNull(topic, "topic can't be null");

    Objects.requireNonNull(consumed, "consumed can't be null");

    Objects.requireNonNull(materialized, "materialized can't be null");

    
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);

    return internalStreamsBuilder.table(topic,

    new ConsumedInternal<>(consumed),

    new 
MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));

    }

 

Where we could see that the store type is given as KeyValueStore. There is no 
flexibility to change it to WindowStore.

 

To maintain compatibility of the existing API, we have two options to define a 
new API:

1.Overload existing KTable struct

public synchronized  KTable, V> windowedTable(final String 
topic,

  final Consumed consumed,

  final Materialized> materialized);

 

This could give developer an alternative to use windowed table instead. 
However, this implies that we need to make sure all the KTable logic still 
works as expected, such as join, aggregation, etc, so the challenge would be 
making sure all current KTable logics work.

 

2.Define a new type called WindowedKTable

public synchronized  WindowedKTable windowedTable(final String 
topic,

  final Consumed consumed,

  final Materialized> materialized);

The benefit of doing this is that we don’t need to worry about the existing 
functionality of KTable. However, the cost is to introduce redundancy of common 
operation logic. When upgrading common functionality, we need to take care of 
both types.

We could fill in more details in the KIP. Right now I would like to hear some 
feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6840) support windowing in ktable API

2018-04-30 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-6840:
---
Labels: api  (was: )

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-02 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461659#comment-16461659
 ] 

Boyang Chen commented on KAFKA-6840:


Sounds good, I will give it a try [~mjsax]

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-05-02 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-6657.

Resolution: Delivered

The code is merged: https://github.com/apache/kafka/pull/4805

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-03 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463297#comment-16463297
 ] 

Boyang Chen commented on KAFKA-6840:


[~mjsax] Sorry haven't updated this thread for a while. This is the 
KeyValueStoreMaterializer code:

```

public StoreBuilder> materialize() {
 KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) 
materialized.storeSupplier();
 if (supplier == null) {
 final String name = materialized.storeName();
 supplier = Stores.persistentKeyValueStore(name);
 }
 final StoreBuilder> builder = 
Stores.keyValueStoreBuilder(supplier,
 materialized.keySerde(),
 materialized.valueSerde());

 if (materialized.loggingEnabled()) {
 builder.withLoggingEnabled(materialized.logConfig());
 } else {
 builder.withLoggingDisabled();
 }

 if (materialized.cachingEnabled()) {
 builder.withCachingEnabled();
 }
 return builder;
}```

 

Note that the api ` Stores.persistentKeyValueStore(name);` doesn't have an 
equivalent one for persistentWindowStore, who needs retentionPeriod, 
numSegments, windowSize etc. Do we want to provide default values to all these 
configs, or let user have an option to pass them in?

 

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-03 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463304#comment-16463304
 ] 

Boyang Chen commented on KAFKA-6840:


I think alternatively we could extend class `Materialized` with those configs, 
by passing in retention, numSegments etc, how does that sound? I will keep 
trying this way now.

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-04 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464310#comment-16464310
 ] 

Boyang Chen commented on KAFKA-6840:


Yep, forget to update. One more thing is that:

public  KTable table(final AutoOffsetReset offsetReset, final 
String topic, final 
org.apache.kafka.streams.processor.StateStoreSupplier 
storeSupplier);

could not overload by replacing KeyValueStore with WindowStore since it will 
hit `Method has the same erasure` error. Do you have an idea about that? 
[~mjsax]

 

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-05 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464919#comment-16464919
 ] 

Boyang Chen commented on KAFKA-6840:


[~mjsax] thanks for the reply. I think I might mention the wrong function, and 
the function I want to show is actually this one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java#L218]

Even with `table(String, Materialized)` I feel the Materialized could not be 
overloaded due to the same erasure error. 

On the other hand, I looked around and found some implementations like this in 
TimeWindowedKStream:

 KTable, VR> aggregate(final Initializer initializer,
 final Aggregator aggregator,
 final Materialized> materialized);

Link: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java]

My concern is that here we provide a ` KTable, VR>` output to the 
user. If we only change Materialized and return a `KTable` from 
`table(String, Materialized)` whose underlying implementation is actually 
, V>, we are not supporting consistent API across different 
implementations. If we also want to return , V> from the table API, 
we do need public API change.

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6840) support windowing in ktable API

2018-05-07 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465540#comment-16465540
 ] 

Boyang Chen commented on KAFKA-6840:


Sounds like a plan! I will start preparing a KIP and code for this, thank you! 
[~mjsax]

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6896) add producer metrics exporting in KafkaStreams.java

2018-05-10 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6896:
--

 Summary: add producer metrics exporting in KafkaStreams.java
 Key: KAFKA-6896
 URL: https://issues.apache.org/jira/browse/KAFKA-6896
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


We would like to also export the producer metrics from {{StreamThread}} just 
like consumer metrics, so that we could gain more visibility of stream 
application. The approach is to pass in the {{threadProducer}}into the 
StreamThread so that we could export its metrics in dynamic.

Note that this is a pure internal change that doesn't require a KIP, and in the 
future we also want to export admin client metrics. A followup KIP for admin 
client will be created once this is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6896) add producer metrics exporting in KafkaStreams.java

2018-05-10 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-6896:
---
Description: 
We would like to also export the producer metrics from {{StreamThread}} just 
like consumer metrics, so that we could gain more visibility of stream 
application. The approach is to pass in the \{{threadProducer}}into the 
StreamThread so that we could export its metrics in dynamic.

Note that this is a pure internal change that doesn't require a KIP, and in the 
future we also want to export admin client metrics. A followup KIP for admin 
client will be created once this is merged.

Pull request here: https://github.com/apache/kafka/pull/4998

  was:
We would like to also export the producer metrics from {{StreamThread}} just 
like consumer metrics, so that we could gain more visibility of stream 
application. The approach is to pass in the {{threadProducer}}into the 
StreamThread so that we could export its metrics in dynamic.

Note that this is a pure internal change that doesn't require a KIP, and in the 
future we also want to export admin client metrics. A followup KIP for admin 
client will be created once this is merged.


> add producer metrics exporting in KafkaStreams.java
> ---
>
> Key: KAFKA-6896
> URL: https://issues.apache.org/jira/browse/KAFKA-6896
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We would like to also export the producer metrics from {{StreamThread}} just 
> like consumer metrics, so that we could gain more visibility of stream 
> application. The approach is to pass in the \{{threadProducer}}into the 
> StreamThread so that we could export its metrics in dynamic.
> Note that this is a pure internal change that doesn't require a KIP, and in 
> the future we also want to export admin client metrics. A followup KIP for 
> admin client will be created once this is merged.
> Pull request here: https://github.com/apache/kafka/pull/4998



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6986) Export Admin Client metrics through Stream Threads

2018-06-02 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6986:
--

 Summary: Export Admin Client metrics through Stream Threads
 Key: KAFKA-6986
 URL: https://issues.apache.org/jira/browse/KAFKA-6986
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
 Fix For: 2.0.0


We already exported producer and consumer metrics through KafkaStreams class:

[https://github.com/apache/kafka/pull/4998]

It makes sense to also export the Admin client metrics, however this might 
involve some public API change that needs a KIP.

I have allocated a KIP page here:

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453500]

If any new contributor wishes to take over this one, please let me know. I will 
revisit and close this ticket in one or two months later in case no one claims 
it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads

2018-06-05 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501418#comment-16501418
 ] 

Boyang Chen commented on KAFKA-6986:


Hey there,

Of course! Feel free to reassign this to you and let me know if you need any 
help :)

> Export Admin Client metrics through Stream Threads
> --
>
> Key: KAFKA-6986
> URL: https://issues.apache.org/jira/browse/KAFKA-6986
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: Newcomer, beginner, newbie
> Fix For: 2.0.0
>
>
> We already exported producer and consumer metrics through KafkaStreams class:
> [https://github.com/apache/kafka/pull/4998]
> It makes sense to also export the Admin client metrics, however this might 
> involve some public API change that needs a KIP.
> I have allocated a KIP page here:
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453500]
> If any new contributor wishes to take over this one, please let me know. I 
> will revisit and close this ticket in one or two months later in case no one 
> claims it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-05 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6995:
--

 Summary: Make config "internal.leave.group.on.close" public
 Key: KAFKA-6995
 URL: https://issues.apache.org/jira/browse/KAFKA-6995
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Boyang Chen
Assignee: Boyang Chen


We are proposing to make the config "internal.leave.group.on.close" public. The 
reason is that for heavy state application the sticky assignment won't work 
because each stream worker will leave group during rolling restart, and there 
is a possibility that some members are left and rejoined while others are still 
awaiting restart. This would then cause multiple rebalance because after the 
ongoing rebalance is done, we are expecting late members to rejoin and move 
state from `stable` to `prepareBalance`. To solve this problem, heavy state 
application needs to use this config to avoid member list update, so that at 
most one rebalance will be triggered at a proper time when all the members are 
rejoined during rolling restart. This should just be one line change.

Code here:

* internal.leave.group.on.close
 * Whether or not the consumer should leave the group on close. If set to 
false then a rebalance
 * won't occur until session.timeout.ms expires.
 *
 * 
 * Note: this is an internal configuration and could be changed in the future 
in a backward incompatible way
 *
 */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads

2018-06-05 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501921#comment-16501921
 ] 

Boyang Chen commented on KAFKA-6986:


[~shung] I think you need to get contributor access first. [~mjsax] may help 
you set up so that you could assign ticket to yourself.

> Export Admin Client metrics through Stream Threads
> --
>
> Key: KAFKA-6986
> URL: https://issues.apache.org/jira/browse/KAFKA-6986
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: Newcomer, beginner, newbie
> Fix For: 2.0.0
>
>
> We already exported producer and consumer metrics through KafkaStreams class:
> [https://github.com/apache/kafka/pull/4998]
> It makes sense to also export the Admin client metrics, however this might 
> involve some public API change that needs a KIP.
> I have allocated a KIP page here:
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80453500]
> If any new contributor wishes to take over this one, please let me know. I 
> will revisit and close this ticket in one or two months later in case no one 
> claims it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-05 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502119#comment-16502119
 ] 

Boyang Chen commented on KAFKA-6995:


Original config was added here: https://issues.apache.org/jira/browse/KAFKA-4881

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-06 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504209#comment-16504209
 ] 

Boyang Chen commented on KAFKA-6995:


Basically leave group request will make the rebalance faster because we could 
detect consumer offline faster. In our case of rolling restart, it is favorable 
not to do so because leaving group explicitly will break the existing member 
list, which means there are always hosts restarted early and leave the group, 
while hosts restarted late will not be considered for rebalance. The incomplete 
rebalance will trigger multiple times until the consumption is stabilized.

Where did Kafka Streams set this config to false? I didn't find that code, 
could you elaborate on that? [~mjsax]

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-06 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504209#comment-16504209
 ] 

Boyang Chen edited comment on KAFKA-6995 at 6/7/18 4:00 AM:


Basically leave group request will make the rebalance faster because we could 
detect consumer offline faster. In our case of rolling restart, it is favorable 
not to do so because leaving group explicitly will break the existing member 
list, which means there are always hosts restarted early and leave the group, 
while hosts restarted late will not be considered for rebalance. The incomplete 
rebalance will trigger multiple times until the consumer list is stabilized.

Where did Kafka Streams set this config to false? I didn't find that code, 
could you elaborate on that? [~mjsax]


was (Author: bchen225242):
Basically leave group request will make the rebalance faster because we could 
detect consumer offline faster. In our case of rolling restart, it is favorable 
not to do so because leaving group explicitly will break the existing member 
list, which means there are always hosts restarted early and leave the group, 
while hosts restarted late will not be considered for rebalance. The incomplete 
rebalance will trigger multiple times until the consumption is stabilized.

Where did Kafka Streams set this config to false? I didn't find that code, 
could you elaborate on that? [~mjsax]

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-07 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505048#comment-16505048
 ] 

Boyang Chen commented on KAFKA-6995:


[~mjsax]thanks for the explanation! I realized that the config was set to false 
once you pointed out, where previously I misunderstood the config. Just one 
thing I want to confirm is that, whether should we make this config public so 
that developer has an option to set it true? The thing is that if we are 
dealing with stateless application, it would be ideal to trigger rebalance when 
some machine gets killed or go offline in devOps, because the state transfer is 
0 and standby task could take over pretty quick.

I will create another ticket to track one new issue I discovered yesterday. 
Thanks!

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7018) persist memberId for consumer restart

2018-06-07 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7018:
--

 Summary: persist memberId for consumer restart
 Key: KAFKA-7018
 URL: https://issues.apache.org/jira/browse/KAFKA-7018
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Boyang Chen
Assignee: Boyang Chen


In group coordinator, there is a logic to neglect join group request from 
existing follower consumers:
{code:java}
case Empty | Stable =>
  if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
clientHost, protocolType, protocols, group, responseCallback)
  } else {
val member = group.get(memberId)
if (group.isLeader(memberId) || !member.matches(protocols)) {
  // force a rebalance if a member has changed metadata or if the leader 
sends JoinGroup.
  // The latter allows the leader to trigger rebalances for changes 
affecting assignment
  // which do not affect the member metadata (such as topic metadata 
changes for the consumer)
  updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
  // for followers with no actual change to their metadata, just return 
group information
  // for the current generation which will allow them to issue SyncGroup
  responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
{code}
While looking at the AbstractCoordinator, I found that the generation was 
hard-coded as 

NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
first join group request. This means we will treat the restarted consumer as a 
new member, so the rebalance will be triggered until session timeout.

I'm trying to clarify the following things before we extend the discussion:
 # Whether my understanding of the above logic is right (Hope [~mjsax] could 
help me double check)
 # Whether it makes sense to persist last round of memberId for consumers? We 
currently only need this feature in stream application, but will do no harm if 
we also use it for consumer in general. This would be a nice-to-have feature on 
consumer restart when we configured the loading-previous-memberId to true. If 
we failed, simply use the UNKNOWN_MEMBER_ID
 # The behavior could also be changed on the broker side, but I suspect it is 
very risky. So far client side change should be the least effort. The end goal 
is to avoid excessive rebalance from the same consumer restart, so if you feel 
server side change could also help, we could further discuss.

Thank you for helping out! [~mjsax] [~guozhang]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-07 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505383#comment-16505383
 ] 

Boyang Chen commented on KAFKA-6995:


Thanks for the explanation, I understand the concern here. I agree this is an 
advanced config for new users, but may worth exposing because this makes end 
user clear about what kind of rebalance behavior will trigger after a service 
concussion. I could work on the documentation to make this clear to the end 
user what this means, and by setting this to `true` or `false` will have what 
corresponding outcomes. 

If we have clear documentation on this, it would be ideal to leave this config 
to the end user.

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7018) persist memberId for consumer restart

2018-06-07 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505706#comment-16505706
 ] 

Boyang Chen commented on KAFKA-7018:


Thanks Guozhang! This could be done by our side, by persisting some local file 
mapping from client-id to member-id (if the client-id is fixed through 
iteration), as long as we agree on the design. Hey [~hachikuji], could you 
share more thoughts here?

cc [~ishiihara]

> persist memberId for consumer restart
> -
>
> Key: KAFKA-7018
> URL: https://issues.apache.org/jira/browse/KAFKA-7018
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
> // if the member id is unknown, register the member to the group
> addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
> val member = group.get(memberId)
> if (group.isLeader(memberId) || !member.matches(protocols)) {
>   // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>   // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>   // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>   updateMemberAndRebalance(group, member, protocols, responseCallback)
> } else {
>   // for followers with no actual change to their metadata, just return 
> group information
>   // for the current generation which will allow them to issue SyncGroup
>   responseCallback(JoinGroupResult(
> members = Map.empty,
> memberId = memberId,
> generationId = group.generationId,
> subProtocol = group.protocolOrNull,
> leaderId = group.leaderOrNull,
> error = Errors.NONE))
> }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could 
> help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We 
> currently only need this feature in stream application, but will do no harm 
> if we also use it for consumer in general. This would be a nice-to-have 
> feature on consumer restart when we configured the loading-previous-memberId 
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is 
> very risky. So far client side change should be the least effort. The end 
> goal is to avoid excessive rebalance from the same consumer restart, so if 
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6995) Make config "internal.leave.group.on.close" public

2018-06-08 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506626#comment-16506626
 ] 

Boyang Chen commented on KAFKA-6995:


[~mjsax] That would be a viable solution. However, me as an end user would be 
beneficial to know that leaving group on close would introduce what kind of 
outcome. Even though we could adjust the config based on stateful or stateless 
nature of the job, if the job is stateful but maintains a really lightweight 
state which me as an user feels not necessary to worry about state transfer 
comparing with starving the process. In this case, I would like to see a really 
quick rebalance (faster than minimum session timeout) instead of waiting until 
session timeout.

> Make config "internal.leave.group.on.close" public
> --
>
> Key: KAFKA-6995
> URL: https://issues.apache.org/jira/browse/KAFKA-6995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> We are proposing to make the config "internal.leave.group.on.close" public. 
> The reason is that for heavy state application the sticky assignment won't 
> work because each stream worker will leave group during rolling restart, and 
> there is a possibility that some members are left and rejoined while others 
> are still awaiting restart. This would then cause multiple rebalance because 
> after the ongoing rebalance is done, we are expecting late members to rejoin 
> and move state from `stable` to `prepareBalance`. To solve this problem, 
> heavy state application needs to use this config to avoid member list update, 
> so that at most one rebalance will be triggered at a proper time when all the 
> members are rejoined during rolling restart. This should just be one line 
> change.
> Code here:
> * internal.leave.group.on.close
>  * Whether or not the consumer should leave the group on close. If set to 
> false then a rebalance
>  * won't occur until session.timeout.ms expires.
>  *
>  * 
>  * Note: this is an internal configuration and could be changed in the future 
> in a backward incompatible way
>  *
>  */
>  static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
> "internal.leave.group.on.close";



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7018) persist memberId for consumer restart

2018-06-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514939#comment-16514939
 ] 

Boyang Chen commented on KAFKA-7018:


A summary of the idea:

When leader sends join request, it will always trigger another rebalance 
because there could be metadata change on the topic. So we need to make sure 
other members are aware of that.

Let's imagine a condition where every member joins with previous generation 
info, then:

If follower joins after leader, the stage would be starting from 
prepareRebalance, all the join group request from followers will be retained 
and make member state to awaitJoinCallBack. 

If followers are joining before leader joins, they would send another sync 
group request. 
 * If leader changes group state to prepareRebalance, we refuse the sync group 
request and they would rejoin.
 * If leader haven’t changed the group state to prepareRebalance, sync group 
request would success and follower starts sending heartbeat. In 
handleHeartbeat() function, eventually the leader will move state towards 
prepareRebalance, so the rebalance in progress error will be triggered.

Code logic here:

 *_} else if (error == Errors.REBALANCE_IN_PROGRESS) {_*

    *_log.debug("Attempt to heartbeat failed since group is 
rebalancing");_*

    *_requestRejoin();_*

    *_future.raise(Errors.REBALANCE_IN_PROGRESS);_*

    *_}_* 

So now the only thing we need to do is on client side to make sure member id 
keeps the same through restart.

 

> persist memberId for consumer restart
> -
>
> Key: KAFKA-7018
> URL: https://issues.apache.org/jira/browse/KAFKA-7018
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
> // if the member id is unknown, register the member to the group
> addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
> val member = group.get(memberId)
> if (group.isLeader(memberId) || !member.matches(protocols)) {
>   // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>   // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>   // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>   updateMemberAndRebalance(group, member, protocols, responseCallback)
> } else {
>   // for followers with no actual change to their metadata, just return 
> group information
>   // for the current generation which will allow them to issue SyncGroup
>   responseCallback(JoinGroupResult(
> members = Map.empty,
> memberId = memberId,
> generationId = group.generationId,
> subProtocol = group.protocolOrNull,
> leaderId = group.leaderOrNull,
> error = Errors.NONE))
> }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to clarify the following things before we extend the discussion:
>  # Whether my understanding of the above logic is right (Hope [~mjsax] could 
> help me double check)
>  # Whether it makes sense to persist last round of memberId for consumers? We 
> currently only need this feature in stream application, but will do no harm 
> if we also use it for consumer in general. This would be a nice-to-have 
> feature on consumer restart when we configured the loading-previous-memberId 
> to true. If we failed, simply use the UNKNOWN_MEMBER_ID
>  # The behavior could also be changed on the broker side, but I suspect it is 
> very risky. So far client side change should be the least effort. The end 
> goal is to avoid excessive rebalance from the same consumer restart, so if 
> you feel server side change could also help, we could further discuss.
> Thank you for helping out! [~mjsax] [~guozhang]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7018) persist memberId for consumer restart

2018-06-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514939#comment-16514939
 ] 

Boyang Chen edited comment on KAFKA-7018 at 6/17/18 12:06 AM:
--

A summary of the idea:

When leader sends join request, it will always trigger another rebalance 
because there could be metadata change on the topic. So we need to make sure 
other members are aware of that.

Let's imagine a condition where every member joins with previous generation 
info, then:

If follower joins after leader, the stage would be starting from 
prepareRebalance, all the join group request from followers will be retained 
and make member state to awaitJoinCallBack.

If followers are joining before leader joins, they would send another sync 
group request.
 * If leader changes group state to prepareRebalance, we refuse the sync group 
request and they would rejoin.
 * If leader haven’t changed the group state to prepareRebalance, sync group 
request would success and follower starts sending heartbeat. In 
handleHeartbeat() function, eventually the leader will move state towards 
prepareRebalance, so the rebalance in progress error will be triggered.

Code logic here:

 *_} else if (error == Errors.REBALANCE_IN_PROGRESS) {_*

   *_log.debug("Attempt to heartbeat failed since group is 
rebalancing");_*

   *_requestRejoin();_*

   *_future.raise(Errors.REBALANCE_IN_PROGRESS);_*

   *_}_*

So now the only thing we need to do is on client side to make sure member id 
keeps the same through restart, don't need to worry about the join sequence of 
follower/leader.

 


was (Author: bchen225242):
A summary of the idea:

When leader sends join request, it will always trigger another rebalance 
because there could be metadata change on the topic. So we need to make sure 
other members are aware of that.

Let's imagine a condition where every member joins with previous generation 
info, then:

If follower joins after leader, the stage would be starting from 
prepareRebalance, all the join group request from followers will be retained 
and make member state to awaitJoinCallBack. 

If followers are joining before leader joins, they would send another sync 
group request. 
 * If leader changes group state to prepareRebalance, we refuse the sync group 
request and they would rejoin.
 * If leader haven’t changed the group state to prepareRebalance, sync group 
request would success and follower starts sending heartbeat. In 
handleHeartbeat() function, eventually the leader will move state towards 
prepareRebalance, so the rebalance in progress error will be triggered.

Code logic here:

 *_} else if (error == Errors.REBALANCE_IN_PROGRESS) {_*

    *_log.debug("Attempt to heartbeat failed since group is 
rebalancing");_*

    *_requestRejoin();_*

    *_future.raise(Errors.REBALANCE_IN_PROGRESS);_*

    *_}_* 

So now the only thing we need to do is on client side to make sure member id 
keeps the same through restart.

 

> persist memberId for consumer restart
> -
>
> Key: KAFKA-7018
> URL: https://issues.apache.org/jira/browse/KAFKA-7018
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
> // if the member id is unknown, register the member to the group
> addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
> val member = group.get(memberId)
> if (group.isLeader(memberId) || !member.matches(protocols)) {
>   // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>   // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>   // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>   updateMemberAndRebalance(group, member, protocols, responseCallback)
> } else {
>   // for followers with no actual change to their metadata, just return 
> group information
>   // for the current generation which will allow them to issue SyncGroup
>   responseCallback(JoinGroupResult(
> members = Map.empty,
> memberId = memberId,
> generationId = group.generationId,
> subProtocol = group.protocolOrNull,
> leaderId = group.leaderOrNull,
> error = Errors.NONE))
> }
> {code}
> While looking at the AbstractCoordinator, I fo

[jira] [Created] (KAFKA-7071) specify number of partitions when using repartition logic

2018-06-18 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7071:
--

 Summary: specify number of partitions when using repartition logic
 Key: KAFKA-7071
 URL: https://issues.apache.org/jira/browse/KAFKA-7071
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


Hey there, I was wondering whether it makes sense to specify number of 
partitions of the output topic from a repartition operation like groupBy, 
flatMap, etc. The current DSL doesn't support adding a customized repartition 
topic which has customized number of partitions. For example, I want to reduce 
the input topic from 8 partitions to 1, there is no easy solution but to create 
a new topic instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7071) specify number of partitions when using repartition logic

2018-06-18 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-7071:
---
Description: 
Hey there, I was wondering whether it makes sense to specify number of 
partitions of the output topic from a repartition operation like groupBy, 
flatMap, etc. The current DSL doesn't support adding a customized repartition 
topic which has customized number of partitions. For example, I want to reduce 
the input topic from 8 partitions to 1, there is no easy solution but to create 
a new topic instead.

cc [~guozhang] [~liquanpei]

  was:Hey there, I was wondering whether it makes sense to specify number of 
partitions of the output topic from a repartition operation like groupBy, 
flatMap, etc. The current DSL doesn't support adding a customized repartition 
topic which has customized number of partitions. For example, I want to reduce 
the input topic from 8 partitions to 1, there is no easy solution but to create 
a new topic instead.


> specify number of partitions when using repartition logic
> -
>
> Key: KAFKA-7071
> URL: https://issues.apache.org/jira/browse/KAFKA-7071
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Hey there, I was wondering whether it makes sense to specify number of 
> partitions of the output topic from a repartition operation like groupBy, 
> flatMap, etc. The current DSL doesn't support adding a customized repartition 
> topic which has customized number of partitions. For example, I want to 
> reduce the input topic from 8 partitions to 1, there is no easy solution but 
> to create a new topic instead.
> cc [~guozhang] [~liquanpei]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7071) specify number of partitions when using repartition logic

2018-06-18 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516326#comment-16516326
 ] 

Boyang Chen commented on KAFKA-7071:


[~mjsax] looks so, I will close this and reassign the ticket of 6037 to myself.

> specify number of partitions when using repartition logic
> -
>
> Key: KAFKA-7071
> URL: https://issues.apache.org/jira/browse/KAFKA-7071
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Hey there, I was wondering whether it makes sense to specify number of 
> partitions of the output topic from a repartition operation like groupBy, 
> flatMap, etc. The current DSL doesn't support adding a customized repartition 
> topic which has customized number of partitions. For example, I want to 
> reduce the input topic from 8 partitions to 1, there is no easy solution but 
> to create a new topic instead.
> cc [~guozhang] [~liquanpei]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6037) Make Sub-topology Parallelism Tunable

2018-06-18 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-6037:
--

Assignee: Boyang Chen

> Make Sub-topology Parallelism Tunable
> -
>
> Key: KAFKA-6037
> URL: https://issues.apache.org/jira/browse/KAFKA-6037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> Today the downstream sub-topology's parallelism (aka the number of tasks) are 
> purely dependent on the upstream sub-topology's parallelism, which ultimately 
> depends on the source topic's num. partitions. However this does not work 
> perfectly with dynamic scaling scenarios.
> Imagine if your have a simple aggregation application, it would have two 
> sub-topologies cut by the repartition topic, the first sub-topology would be 
> very light as it reads from input topics and write to repartition topic based 
> on the agg-key; the second sub-topology would do the actual work with the agg 
> state store, etc, hence is heavy computational. Right now the first and 
> second topology will always have the same number of tasks as the repartition 
> topic num.partitions is defined to be the same as the source topic 
> num.partitions, so to scale up we have to increase the number of input topic 
> partitions.
> One way to improve on that, is to use a default large number for repartition 
> topics and also allow users to override it (either through DSL code, or 
> through config). Doing this different sub-topologies would have different 
> number of tasks, i.e. parallelism units. In addition, users may also use this 
> config to "hint" the DSL translator to NOT create the repartition topics 
> (i.e. to not break up the sub-topologies) when she has the knowledge of the 
> data format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7071) specify number of partitions when using repartition logic

2018-06-18 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7071.

Resolution: Duplicate

Dup with 6037

> specify number of partitions when using repartition logic
> -
>
> Key: KAFKA-7071
> URL: https://issues.apache.org/jira/browse/KAFKA-7071
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Hey there, I was wondering whether it makes sense to specify number of 
> partitions of the output topic from a repartition operation like groupBy, 
> flatMap, etc. The current DSL doesn't support adding a customized repartition 
> topic which has customized number of partitions. For example, I want to 
> reduce the input topic from 8 partitions to 1, there is no easy solution but 
> to create a new topic instead.
> cc [~guozhang] [~liquanpei]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6037) Make Sub-topology Parallelism Tunable

2018-06-18 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516329#comment-16516329
 ] 

Boyang Chen commented on KAFKA-6037:


I will take a look at it this week, thank you [~mjsax] for pointing out~

> Make Sub-topology Parallelism Tunable
> -
>
> Key: KAFKA-6037
> URL: https://issues.apache.org/jira/browse/KAFKA-6037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> Today the downstream sub-topology's parallelism (aka the number of tasks) are 
> purely dependent on the upstream sub-topology's parallelism, which ultimately 
> depends on the source topic's num. partitions. However this does not work 
> perfectly with dynamic scaling scenarios.
> Imagine if your have a simple aggregation application, it would have two 
> sub-topologies cut by the repartition topic, the first sub-topology would be 
> very light as it reads from input topics and write to repartition topic based 
> on the agg-key; the second sub-topology would do the actual work with the agg 
> state store, etc, hence is heavy computational. Right now the first and 
> second topology will always have the same number of tasks as the repartition 
> topic num.partitions is defined to be the same as the source topic 
> num.partitions, so to scale up we have to increase the number of input topic 
> partitions.
> One way to improve on that, is to use a default large number for repartition 
> topics and also allow users to override it (either through DSL code, or 
> through config). Doing this different sub-topologies would have different 
> number of tasks, i.e. parallelism units. In addition, users may also use this 
> config to "hint" the DSL translator to NOT create the repartition topics 
> (i.e. to not break up the sub-topologies) when she has the knowledge of the 
> data format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6037) Make Sub-topology Parallelism Tunable

2018-06-18 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516386#comment-16516386
 ] 

Boyang Chen commented on KAFKA-6037:


I took a quick look of KIP 221:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams]
It seems to be a reasonable API to be added to existing repartition operations. 
If you [~guozhang] have no objections, I could try with this API on one of the 
DSL first. 

> Make Sub-topology Parallelism Tunable
> -
>
> Key: KAFKA-6037
> URL: https://issues.apache.org/jira/browse/KAFKA-6037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
>
> Today the downstream sub-topology's parallelism (aka the number of tasks) are 
> purely dependent on the upstream sub-topology's parallelism, which ultimately 
> depends on the source topic's num. partitions. However this does not work 
> perfectly with dynamic scaling scenarios.
> Imagine if your have a simple aggregation application, it would have two 
> sub-topologies cut by the repartition topic, the first sub-topology would be 
> very light as it reads from input topics and write to repartition topic based 
> on the agg-key; the second sub-topology would do the actual work with the agg 
> state store, etc, hence is heavy computational. Right now the first and 
> second topology will always have the same number of tasks as the repartition 
> topic num.partitions is defined to be the same as the source topic 
> num.partitions, so to scale up we have to increase the number of input topic 
> partitions.
> One way to improve on that, is to use a default large number for repartition 
> topics and also allow users to override it (either through DSL code, or 
> through config). Doing this different sub-topologies would have different 
> number of tasks, i.e. parallelism units. In addition, users may also use this 
> config to "hint" the DSL translator to NOT create the repartition topics 
> (i.e. to not break up the sub-topologies) when she has the knowledge of the 
> data format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    5   6   7   8   9   10