[jira] [Resolved] (KAFKA-8569) Integrate the poll timeout warning with leave group call

2019-06-27 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8569.

Resolution: Fixed

> Integrate the poll timeout warning with leave group call
> 
>
> Key: KAFKA-8569
> URL: https://issues.apache.org/jira/browse/KAFKA-8569
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Under static membership, we may be polluting our log by seeing a bunch of 
> consecutive warning message upon poll timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8356) Add static membership to Round Robin assignor

2019-06-27 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8356.

Resolution: Fixed

> Add static membership to Round Robin assignor
> -
>
> Key: KAFKA-8356
> URL: https://issues.apache.org/jira/browse/KAFKA-8356
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8538) Add `group.instance.id` to DescribeGroup for better visibility

2019-06-27 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8538.

Resolution: Fixed

> Add `group.instance.id` to DescribeGroup for better visibility
> --
>
> Key: KAFKA-8538
> URL: https://issues.apache.org/jira/browse/KAFKA-8538
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk8 #3755

2019-06-27 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8610) Don't use /bin/bash in scripts

2019-06-27 Thread Richard Lee (JIRA)
Richard Lee created KAFKA-8610:
--

 Summary: Don't use /bin/bash in scripts
 Key: KAFKA-8610
 URL: https://issues.apache.org/jira/browse/KAFKA-8610
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Lee


On small container installations (such as alpine), /bin/bash is not installed. 
It appears the scripts in the /bin directory would mostly work with /bin/sh. 
Please use a simpler shell for shell scripts so that they are more portable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-429: Kafka Consumer Incremental Rebalance Protocol

2019-06-27 Thread Boyang Chen
Thank you Sophie for the update. Is this also reflected on the KIP?

On Thu, Jun 27, 2019 at 3:28 PM Sophie Blee-Goldman 
wrote:

> We would like to tack on some rebalance-related metrics as part of this KIP
> as well. The details can be found in the sub-task JIRA:
> https://issues.apache.org/jira/browse/KAFKA-8609
>
> On Thu, May 30, 2019 at 5:09 PM Guozhang Wang  wrote:
>
> > +1 (binding) from me as well.
> >
> > Thanks to everyone who have voted! I'm closing this vote thread with a
> > tally:
> >
> > binding +1: 3 (Guozhang, Harsha, Matthias)
> >
> > non-binding +1: 2 (Boyang, Liquan)
> >
> >
> > Guozhang
> >
> > On Wed, May 22, 2019 at 9:22 PM Matthias J. Sax 
> > wrote:
> >
> > > +1 (binding)
> > >
> > >
> > > On 5/22/19 7:37 PM, Harsha wrote:
> > > > +1 (binding). Thanks for the KIP looking forward for this to be
> > avaiable
> > > in consumers.
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > > On Wed, May 22, 2019, at 12:24 AM, Liquan Pei wrote:
> > > >> +1 (non-binding)
> > > >>
> > > >> On Tue, May 21, 2019 at 11:34 PM Boyang Chen 
> > > wrote:
> > > >>
> > > >>> Thank you Guozhang for all the hard work.
> > > >>>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> 
> > > >>> From: Guozhang Wang 
> > > >>> Sent: Wednesday, May 22, 2019 1:32 AM
> > > >>> To: dev
> > > >>> Subject: [VOTE] KIP-429: Kafka Consumer Incremental Rebalance
> > Protocol
> > > >>>
> > > >>> Hello folks,
> > > >>>
> > > >>> I'd like to start the voting for KIP-429 now, details can be found
> > > here:
> > > >>>
> > > >>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-RebalanceCallbackErrorHandling
> > > >>>
> > > >>> And the on-going PRs available for review:
> > > >>>
> > > >>> Part I: https://github.com/apache/kafka/pull/6528
> > > >>> Part II: https://github.com/apache/kafka/pull/6778
> > > >>>
> > > >>>
> > > >>> Thanks
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> Liquan Pei
> > > >> Software Engineer, Confluent Inc
> > > >>
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>


Re: KIP-457: Add DISCONNECTED state to Kafka Streams

2019-06-27 Thread Richard Yu
 Hi Matthias and Hachikuji,
Sorry for getting back to you so late. Currently on a trip, so I hadn't got the 
time to respond.
Currently, I'm not sure which approach we should do ATM, considering that 
Guozhang posed multiple possibilities in the previous email.Do you have any 
preferences as to which approach we should take?
It would greatly help in the implementation of the issue.
Cheers,Richard
On Thursday, June 13, 2019, 4:55:29 PM GMT+8, Richard Yu 
 wrote:  
 
  Hi Guozhang,
Thanks for the input! Then I guess from the approach you have listed above, no 
API changes will be needed in Kafka consumer then. That will greatly simplify 
things, although when implementing these approaches, there might be some 
unexpected issues which might show up.
Cheers,Richard
    On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang 
 wrote:  
 
 Hi Richard,

Sorry for getting late on this, I've finally get some time to take a look
at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
Here are some thoughts:

1. The main motivation of this KIP is to be able to distinguish the case
where

a. "Streams client is in an unhealthy situation and hence cannot proceed"
(which we have an ERROR state) and
b. "Streams client is perfectly healthy, but it cannot get to the target
brokers and hence cannot proceed", and this should also be distinguishable
from
c. "both Streams and brokers are healthy, there's just no data available
for processing and hence cannot proceed").

And we want to have a way to notify the users about the second case b)
distinguished from the others .

2. Following this, when I first thought about the solution I was thinking
about adding a new state in the FSM of Kafka Streams, but after reviewing
the code and the KIP, I felt this may be an overkill to complicate the FSM.
Now I'm wondering if we can achieve the same thing with a single metric.
For example:

2.a) we know that in Streams we always rely on consumer membership to
allocate partitions to instances, which means that the heartbeat thread has
to be working if the consumer wants to ever receive some data, what we can
do is to let users monitor on this metric directly, e.g. if the
heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
are in case b) above.

2.b) if we want to provide a streams-level metric out-of-the-box rather
than letting users to monitor on consumer metrics, another idea is to
leverage on existing "public Set assignment()" of
KafkaConsumer, and record the time when it returns empty, meaning that
nothing was assigned. And expose this as a boolean metric indicating
nothing was assigned and hence we are likely in case b) above --- note this
could also mean that we have fewer partitions than necessary so that some
instance does not have any assignment indeed, which is not the same as b),
but I feel consolidating these to cases with a single metric seem also fine.



Guozhang




On Wed, Apr 17, 2019 at 2:30 PM Richard Yu 
wrote:

> Alright, so I made a few changes to the KIP.
> I realized that there might be an easier way to give the user information
> on the connection state of Kafka Streams.
> In implementation, if one wishes to have DISCONNECTED as a state, then one
> would have to factor in proper state transitions.
> The other approach that is now outlined in the KIP. Instead, we could just
> add a method which I think achieves the same effect.
> If any of you thinks there is wrong with this approach, please let me know.
> :)
>
> Cheers,
> Richard
>
> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu 
> wrote:
>
> > I just realized something.
> >
> > Hi Matthias, might need your input here.
> > I realized that when implementing this change, as noted in the JIRA, we
> > would need to "check the behaviour of the consumer" since its consumer's
> > connection with broker that we are dealing with.
> >
> > So doesn't that mean we would also be dealing with consumer API changes
> as
> > well?
> > I don't think consumer has any methods which would give us the state of a
> > connection either.
> >
> > - Richard
> >
> > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu 
> > wrote:
> >
> >> Hi Micheal,
> >>
> >> Yeah, those are some points I should've clarified.
> >> No problem. Have got it done.
> >>
> >>
> >>
> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll 
> >> wrote:
> >>
> >>> Richard,
> >>>
> >>> thanks for looking into this!
> >>>
> >>> However, I have some concerns. The KIP you created (
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> )
> >>> doesn't yet address open questions such as the ones mentioned by
> >>> Matthias:
> >>>
> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
> >>> This
> >>> should be defined in the KIP.
> >>>
> >>> 2) Difference between your KIP and the JIRA (
> >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
> >>> the
> >>> DISCONNECTED state was proposed 

Request for Permission to Create KIP

2019-06-27 Thread Maulin Vasavada
Hi

Can you please give me permission to Create KIP?

My username: maulin.vasavada

Thank you.
Maulin


Re: [VOTE] KIP-429: Kafka Consumer Incremental Rebalance Protocol

2019-06-27 Thread Sophie Blee-Goldman
We would like to tack on some rebalance-related metrics as part of this KIP
as well. The details can be found in the sub-task JIRA:
https://issues.apache.org/jira/browse/KAFKA-8609

On Thu, May 30, 2019 at 5:09 PM Guozhang Wang  wrote:

> +1 (binding) from me as well.
>
> Thanks to everyone who have voted! I'm closing this vote thread with a
> tally:
>
> binding +1: 3 (Guozhang, Harsha, Matthias)
>
> non-binding +1: 2 (Boyang, Liquan)
>
>
> Guozhang
>
> On Wed, May 22, 2019 at 9:22 PM Matthias J. Sax 
> wrote:
>
> > +1 (binding)
> >
> >
> > On 5/22/19 7:37 PM, Harsha wrote:
> > > +1 (binding). Thanks for the KIP looking forward for this to be
> avaiable
> > in consumers.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Wed, May 22, 2019, at 12:24 AM, Liquan Pei wrote:
> > >> +1 (non-binding)
> > >>
> > >> On Tue, May 21, 2019 at 11:34 PM Boyang Chen 
> > wrote:
> > >>
> > >>> Thank you Guozhang for all the hard work.
> > >>>
> > >>> +1 (non-binding)
> > >>>
> > >>> 
> > >>> From: Guozhang Wang 
> > >>> Sent: Wednesday, May 22, 2019 1:32 AM
> > >>> To: dev
> > >>> Subject: [VOTE] KIP-429: Kafka Consumer Incremental Rebalance
> Protocol
> > >>>
> > >>> Hello folks,
> > >>>
> > >>> I'd like to start the voting for KIP-429 now, details can be found
> > here:
> > >>>
> > >>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-RebalanceCallbackErrorHandling
> > >>>
> > >>> And the on-going PRs available for review:
> > >>>
> > >>> Part I: https://github.com/apache/kafka/pull/6528
> > >>> Part II: https://github.com/apache/kafka/pull/6778
> > >>>
> > >>>
> > >>> Thanks
> > >>> -- Guozhang
> > >>>
> > >>
> > >>
> > >> --
> > >> Liquan Pei
> > >> Software Engineer, Confluent Inc
> > >>
> >
> >
>
> --
> -- Guozhang
>


Build failed in Jenkins: kafka-trunk-jdk11 #662

2019-06-27 Thread Apache Jenkins Server
See 


Changes:

[rajinisivaram] MINOR: Fix failing upgrade test by supporting both

[rajinisivaram] MINOR:  Support listener config overrides in system tests 
(#6981)

--
[...truncated 2.47 MB...]
org.apache.kafka.connect.transforms.CastTest > testConfigInvalidSchemaType 
STARTED

org.apache.kafka.connect.transforms.CastTest > testConfigInvalidSchemaType 
PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessString STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessString PASSED

org.apache.kafka.connect.transforms.CastTest > castFieldsWithSchema STARTED

org.apache.kafka.connect.transforms.CastTest > castFieldsWithSchema PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessDateToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessDateToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimeToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimeToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaUnixToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaUnixToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessIdentity STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessIdentity PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaFieldConversion STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaFieldConversion PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaDateToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaDateToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaIdentity STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaIdentity PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigMissingFormat STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigMissingFormat PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigNoTargetType STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigNoTargetType PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaStringToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaStringToTimestamp PASSED


[jira] [Created] (KAFKA-8609) Add consumer metrics for rebalances

2019-06-27 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8609:
--

 Summary: Add consumer metrics for rebalances
 Key: KAFKA-8609
 URL: https://issues.apache.org/jira/browse/KAFKA-8609
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sophie Blee-Goldman


We would like to track some additional metrics on the consumer side related to 
rebalancing as part of this KIP, including

 

1) total rebalance latency (latency from start to completion of rebalance)

2) per-callback latency (time spent in onPartitionsRevoked, 
onPartitionsAssigned, onPartitionsLost)

3) join/sync group latency (response_received_time - request_sent_time)

4) rebalance rate (# rebalances/day)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-476: Add Java AdminClient interface

2019-06-27 Thread Matthias J. Sax
@Andy:

What about the factory methods of `AdminClient` class? Should they be
deprecated?

One nit about the KIP: can you maybe insert "code blocks" to highlight
the API changes? Code blocks would simplify to read the KIP a lot.


-Matthias

On 6/26/19 6:56 AM, Ryanne Dolan wrote:
> +1 (non-binding)
> 
> Thanks.
> Ryanne
> 
> On Tue, Jun 25, 2019 at 10:21 PM Satish Duggana 
> wrote:
> 
>> +1 (non-binding)
>>
>> On Wed, Jun 26, 2019 at 8:37 AM Satish Duggana 
>> wrote:
>>>
>>> +1 Matthias/Andy.
>>> IMHO, interface is about the contract, it should not have/expose any
>>> implementation. I am fine with either way as it is more of taste or
>>> preference.
>>>
>>> Agree with Ismael/Colin/Ryanne on not deprecating for good reasons.
>>>
>>>
>>> On Mon, Jun 24, 2019 at 8:33 PM Andy Coates  wrote:

 I agree Matthias.

 (In Scala, such factory methods are on a companion object. As Java
>> doesn't
 have the concept of a companion object, an equivalent would be a
>> utility
 class with a similar name...)

 However, I'll update the KIP to include the factory method on the
>> interface.

 On Fri, 21 Jun 2019 at 23:40, Matthias J. Sax 
>> wrote:

> I still think, that an interface does not need to know anything about
> its implementation. But I am also fine if we add a factory method to
>> the
> new interface if that is preferred by most people.
>
>
> -Matthias
>
> On 6/21/19 7:10 AM, Ismael Juma wrote:
>> This is even more reason not to deprecate immediately, there is
>> very
> little
>> maintenance cost for us. We should be mindful that many of our
>> users (eg
>> Spark, Flink, etc.) typically allow users to specify the kafka
>> clients
>> version and hence avoid using new classes/interfaces for some
>> time. They
>> would get a bunch of warnings they cannot do anything about apart
>> from
>> suppressing.
>>
>> Ismael
>>
>> On Fri, Jun 21, 2019 at 4:00 AM Andy Coates 
>> wrote:
>>
>>> Hi Ismael,
>>>
>>> I’m happy enough to not deprecate the existing `AdminClient`
>> class as
> part
>>> of this change.
>>>
>>> However, note that, the class will likely be empty, i.e. all
>> methods and
>>> implementations will be inherited from the interface:
>>>
>>> public abstract class AdminClient implements Admin {
>>> }
>>>
>>> Not marking it as deprecated has the benefit that users won’t see
>> any
>>> deprecation warnings on the next release. Conversely, deprecating
>> it
> will
>>> mean we can choose to remove this, now pointless class, in the
>> future
> if we
>>> choose.
>>>
>>> That’s my thinking for deprecation, but as I’ve said I’m happy
>> either
> way.
>>>
>>> Andy
>>>
 On 18 Jun 2019, at 16:09, Ismael Juma  wrote:

 I agree with Ryanne, I think we should avoid deprecating
>> AdminClient
> and
 causing so much churn for users who don't actually care about
>> this
> niche
 use case.

 Ismael

 On Tue, Jun 18, 2019 at 6:43 AM Andy Coates 
>> wrote:

> Hi Ryanne,
>
> If we don't change the client code, then everywhere will still
>> expect
> subclasses of `AdminClient`, so the interface will be of no
>> use, i.e.
> I
> can't write a class that implements the new interface and pass
>> it to
> the
> client code.
>
> Thanks,
>
> Andy
>
> On Mon, 17 Jun 2019 at 19:01, Ryanne Dolan <
>> ryannedo...@gmail.com>
>>> wrote:
>
>> Andy, while I agree that the new interface is useful, I'm not
> convinced
>> adding an interface requires deprecating AdminClient and
>> changing so
>>> much
>> client code. Why not just add the Admin interface, have
>> AdminClient
>> implement it, and have done?
>>
>> Ryanne
>>
>> On Mon, Jun 17, 2019 at 12:09 PM Andy Coates <
>> a...@confluent.io>
>>> wrote:
>>
>>> Hi all,
>>>
>>> I think I've addressed all concerns. Let me know if I've
>> not.  Can I
> call
>>> another round of votes please?
>>>
>>> Thanks,
>>>
>>> Andy
>>>
>>> On Fri, 14 Jun 2019 at 04:55, Satish Duggana <
>>> satish.dugg...@gmail.com
>>
>>> wrote:
>>>
 Hi Andy,
 Thanks for the KIP. This is a good change and it gives the
>> user a
>> better
 handle on Admin client usage. I agree with the proposal
>> except the
> new
 `Admin` interface having all the methods from `AdminClient`
> abstract
>>> class.
 It should be kept clean having only the admin operations as
>> methods
>> from
 KafkaClient abstract class but not the 

Build failed in Jenkins: kafka-trunk-jdk8 #3754

2019-06-27 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Fix typos in upgrade guide (#7005)

--
[...truncated 4.81 MB...]
java.lang.NullPointerException
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.cleanup(KStreamKTableJoinTest.java:84)

org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest > 
shouldLogAndMeterWhenSkippingNullLeftValue STARTED
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.shouldLogAndMeterWhenSkippingNullLeftValue
 failed, log available in 


org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest > 
shouldLogAndMeterWhenSkippingNullLeftValue FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:262)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:232)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:242)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:53)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:61)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.setUp(KStreamKTableJoinTest.java:76)

java.lang.NullPointerException
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.cleanup(KStreamKTableJoinTest.java:84)

org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest > 
shouldJoinOnlyIfMatchFoundOnStreamUpdates STARTED
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.shouldJoinOnlyIfMatchFoundOnStreamUpdates
 failed, log available in 


org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest > 
shouldJoinOnlyIfMatchFoundOnStreamUpdates FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:262)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:232)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:242)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:53)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:61)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.setUp(KStreamKTableJoinTest.java:76)

java.lang.NullPointerException
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinTest.cleanup(KStreamKTableJoinTest.java:84)

org.apache.kafka.streams.kstream.internals.KStreamPeekTest > 
shouldNotAllowNullAction STARTED
org.apache.kafka.streams.kstream.internals.KStreamPeekTest.shouldNotAllowNullAction
 failed, log available in 


org.apache.kafka.streams.kstream.internals.KStreamPeekTest > 
shouldNotAllowNullAction FAILED
java.lang.IllegalStateException: Shutdown in progress
at 
java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:262)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:232)
at org.apache.kafka.test.TestUtils.tempDirectory(TestUtils.java:242)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:53)
at 
org.apache.kafka.test.StreamsTestUtils.getStreamsConfig(StreamsTestUtils.java:61)
at 
org.apache.kafka.streams.kstream.internals.KStreamPeekTest.(KStreamPeekTest.java:43)

org.apache.kafka.streams.kstream.internals.KStreamPeekTest > 
shouldObserveStreamElements STARTED
org.apache.kafka.streams.kstream.internals.KStreamPeekTest.shouldObserveStreamElements
 failed, log available in 


org.apache.kafka.streams.kstream.internals.KStreamPeekTest > 

Re: [DISCUSS] KIP-470: TopologyTestDriver test input and output usability improvements

2019-06-27 Thread Matthias J. Sax
Thanks Jukka!

The idea to use `Instant/Duration` was a proposal. If we think it's not
a good one, we could still stay with `long`. Because `ProducerRecord`
and `ConsumerRecord` are both based on `long,` it might make sense to
keep `long`?

> The result of converting millis to Instant directly generates
>> rather non readable test code and changing from long to Instant correctly
>> require understand what is the case it is testing.

This might be a good indicator the using `Instant/Duration` might not be
a good idea.

Would be nice to get feedback from others.

About adding new methods that we deprecate immediately: I don't think we
should do this. IMHO, there are two kind of users, one that immediately
rewrite their code to move off deprecated methods. Those won't use the
new+deprecated ones anyway. Other uses migrate their code slowly and
would just not rewrite their code at all, and thus also not use the
new+deprecated methods.

> Checking my own tests I was able to migrate the most of my code with
> search without further thinking about the logic to this new
> approach. The result of converting millis to Instant directly generates
> rather non readable test code and changing from long to Instant correctly
> require understand what is the case it is testing.

Not sure if I can follow here. You first say, you could easily migrate
your code, but than you say it was not easily possible? Can you clarify
your experience upgrading your test code?


-Matthias


On 6/27/19 12:21 AM, Jukka Karvanen wrote:
> Hi,
> 
>>> (4) Should we switch from `long` for timestamps to `Instant` and
> `Duration` ?
>> This version startTimestamp is Instant and autoAdvance Duration in
> Initialization and with manual configured collection pipe methods.
>> Now timestamp of TestRecord is still Long and similarly single record
> pipeInput still has long as parameter.
>> Should these also converted to to Instant type?
>> Should there be both long and Instant parallel?
>> I expect there are existing test codebase which would be easier to migrate
> if long could be still used.
> Now added Instant version to TestRecord and pipeInput method.
> 
> Checking my own tests I was able to migrate the most of my code with
> search without further thinking about the logic to this new
> approach. The result of converting millis to Instant directly generates
> rather non readable test code and changing from long to Instant correctly
> require understand what is the case it is testing.
> 
> That is why version with long left still as deprecated for easier migration
> for existing test.
> Also TopologyTestDriver constructor and advanceWallClockTime  method
> modified with same approach.
> 
> Jukka
> 
> 
> ma 24. kesäk. 2019 klo 16.47 Bill Bejeck (bbej...@gmail.com) kirjoitti:
> 
>> Hi Jukka
>>
>>> These topic objects are only interfacing TopologyTestDriver, not
>> affecting
>>> the internal functionality of it. In my plan the internal data structures
>>> are using those Producer/ConsumerRecords as earlier. That way I don't see
>>> how those could be affected.
>>
>> I mistakenly thought the KIP was proposing to completely remove
>> Producer/ConsumerRecords in favor of TestRecord.  But taking another quick
>> look I can see the plan for using the TestRecord objects.  Thanks for the
>> clarification.
>>
>> -Bill
>>
>> On Sat, Jun 22, 2019 at 2:26 AM Jukka Karvanen <
>> jukka.karva...@jukinimi.com>
>> wrote:
>>
>>> Hi All,
>>> Hi Matthias,
>>>
 (1) It's a little confusing that you list all method (existing, proposed
 to deprecate, and new one) of `TopologyTestDriver` in the KIP. Maybe
 only list the ones you propose to deprecate and the new ones you want to
 add?
>>>
>>> Ok. Unmodified methods removed.
>>>
 (2) `TopologyTestDriver#createInputTopic`: might it be worth to add
 overload to initialize the timetamp and auto-advance feature directly?
 Otherwise, uses always need to call `configureTiming` as an extra call?
>>>
>>> Added with Instant and Duration parameters.
>>>
 (3) `TestInputTopic#configureTiming()`: maybe rename to
>>> `reconfigureTiming()` ?
>>>
>>> I removed this method when we have now initialization in constructor.
>>> You can recreate TestInputTopic if needing to reconfigure timing.
>>>
>>>
 (4) Should we switch from `long` for timestamps to `Instant` and
>>> `Duration` ?
>>> This version startTimestamp is Instant and autoAdvance Duration in
>>> Initialization and with manual configured collection pipe methods.
>>> Now timestamp of TestRecord is still Long and similarly single record
>>> pipeInput still has long as parameter.
>>> Should these also converted to to Instant type?
>>> Should there be both long and Instant parallel?
>>> I expect there are existing test codebase which would be easier to
>> migrate
>>> if long could be still used.
>>>
>>> Also should advanceWallClockTime  in TopologyTestDriver changed(or added
>>> alternative) for Duration parameter.
>>>
>>>
 (5) Why do we have redundant 

Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Colin McCabe
On Thu, Jun 27, 2019, at 01:31, Ismael Juma wrote:
> Thanks for the KIP Justine. It looks pretty good. A few comments:
> 
> 1. Should we favor partitions that are not under replicated? This is
> something that Netflix did too.

This seems like it could lead to cascading failures, right?  If a partition 
becomes under-replicated because there is too much traffic, the producer stops 
sending to it, which puts even more load on the remaining partitions, which are 
even more likely to fail then, etc.  It also will create unbalanced load 
patterns on the consumers.

> 
> 2. If there's no measurable performance difference, I agree with Stanislav
> that Optional would be better than Integer.
> 
> 3. We should include the javadoc for the newly introduced method that
> specifies it and its parameters. In particular, it would good to specify if
> it gets called when an explicit partition id has been provided.

Agreed.

best,
Colin

> 
> Ismael
> 
> On Mon, Jun 24, 2019, 2:04 PM Justine Olshan  wrote:
> 
> > Hello,
> > This is the discussion thread for KIP-480: Sticky Partitioner.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> >
> > Thank you,
> > Justine Olshan
> >
>


Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-27 Thread Colin McCabe
On Thu, Jun 27, 2019, at 08:58, Jason Gustafson wrote:
> >
> > We'd remove nodes from targetReplicas just as soon as they entered the
> > ISR.  They would become regular replicas at that point.
> 
> 
> I think we can save a lot of back and forth by working through an example.
> Suppose we have the following initial state:
> 
> replicas: [1, 2, 3]
> isr: [1, 2, 3]
> targetReplicas: []
> 
> We want to reassign to [4, 5, 6]. My understanding is that leads to the
> following state:
> 
> replicas: [1, 2, 3]
> isr: [1, 2, 3]
> targetReplicas: [4, 5, 6]
> 
> Let's say replica 4 comes into the ISR first:
> 
> replicas: [1, 2, 3]
> isr: [1, 2, 3, 4]
> targetReplicas: [4, 5, 6]
> 
> What happens now?

The state transitions to:
replicas: [1, 2, 3, 4]
isr: [1, 2, 3, 4]
targetReplicas: [4, 5, 6]

The full sequence would be something like this:
R: [1, 2, 3], I: [1, 2, 3], T: [4, 5, 6]
R: [1, 2, 3], I: [1, 2, 3, 4], T: [4, 5, 6]
R: [1, 2, 3, 4], I: [1, 2, 3, 4], T: [4, 5, 6]
R: [1, 2, 3, 4], I: [1, 2, 3, 4, 5], T: [4, 5, 6]
R: [1, 2, 3, 4, 5], I: [1, 2, 3, 4, 5], T: [4, 5, 6]
R: [1, 2, 3, 4, 5], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
R: [4, 5, 6], I: [4, 5, 6], T: null

Here's another example:
R: [1, 2, 3], I: [1, 2, 3], T: [2, 3, 4]
R: [1, 2, 3], I: [1, 2, 3, 4], T: [2, 3, 4]
R: [1, 2, 3, 4], I: [1, 2, 3, 4], T: [2, 3, 4]
R: [2, 3, 4], I: [2, 3, 4], T: null

Basically, "target replicas" represents what you want the partition assignment 
to be.  "Replicas" represents what it currently is.  In other words, "target 
replicas" has the role that was formerly played by the assignment shown in 
/admin/partition_reassignments.

The main thing that is different is that we don't want to automatically add all 
the target replicas to the normal replica set.  The reason why we don't want to 
do this is because it removes the information we need for cancellation: namely, 
whether each replica was one of the original ones, or one of the ones we're 
reassigning to.  A secondary reason is because nothing useful will come of 
telling users they have a bunch of replicas that don't really exist.  We may 
not have even started copying the first byte over to a target replica, so it 
doesn't make sense to treat it like a normal replica that's not in the ISR 
because it's lagging slightly.

best,
Colin


> 
> (Sorry if I'm being dense, it's just not clear to me exactly what the
> expected transitions are from here.)
> 
> 
> Thanks,
> Jason
> 
> 
> On Wed, Jun 26, 2019 at 2:12 PM Colin McCabe  wrote:
> 
> > On Wed, Jun 26, 2019, at 12:02, Jason Gustafson wrote:
> > > Hi Colin,
> > >
> > > Responses below and another question:
> > >
> > > > I guess the thought process here is that most reassignment tools want
> > to
> > > > know about all the reassignments that are going on.  If you don't know
> > all
> > > > the pending reassignments, then it's hard to say whether adding a new
> > one
> > > > is a good idea, or cancelling an existing one.  So I guess I can't
> > think of
> > > > a case where a reassignment tool would want a partial set rather than
> > the
> > > > full one.
> > >
> > > UIs often have "drill into" options. If there is a large ongoing
> > > reassignment, I can see wanting to limit the scope to a specific topic.
> > Any
> > > downside that I'm missing?
> > >
> >
> > We could add a mode that only lists a given set of partitions.  To be
> > consistent with how we handle topics, that could be a separate "describe"
> > method.  I don't think there's any downside, except some extra code to
> > write.
> >
> > > > Good question.  It will be the current behavior.  Basically, we
> > immediately
> > > > try to replicate to all the targetReplicas.  As they enter the ISR,
> > they
> > > > leave "targetReplicas" and enter "replicas."  Making this more
> > incremental
> > > > would be a good follow-on improvement.
> > >
> > > The current behavior is to wait until all target replicas are in the ISR
> > > before making a change. Are you saying that we will keep this behavior?
> >
> > We'd remove nodes from targetReplicas just as soon as they entered the
> > ISR.  They would become regular replicas at that point.
> >
> > >
> > > > When the /admin/reassign_partitions znode is empty, we'll listen for
> > > > updates.  When an update is made, we'll treat it like a call to
> > > > AlterPartitionReassignments.  Whenever we have zero pending
> > reassignments,
> > > > we'll delete the /admin/reassign_partitions znode.  If the znode
> > already
> > > > exists, we don't listen on it (same as now).
> > >
> > >
> > > So just to be a bit more explicit, what you are saying is that we will
> > keep
> > > the reassignment state under /admin/reassign_partitions as we do
> > currently,
> > > but we will update the target_replicas field in /partition/state
> > following
> > > this new logic. Then as soon as the current replica set matches the
> > target
> > > assignment, we will remove the 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-27 Thread Matthias J. Sax
Thanks for bringing this issue to our attention. Great find @Joe!

Adding the instruction field to the `subscription` sounds like a good
solution. What I don't understand atm: for which case would we need to
send unnecessary tombstone? I thought that the `instruction` field helps
to avoid any unnecessary tombstone? Seems I a missing case?

Also for my own understanding: the `instruction` is only part of the
message? It is no necessary to store it in the RHS auxiliary store, right?

About right/full-outer joins. Agreed. Getting left-joins would be awesome!

About upgrading: Good call John! Adding a version byte for subscription
and response is good forward thinking. I personally prefer version
numbers, too, as they carry more information.

Thanks for all the hard to everybody involved!


-Matthias

On 6/27/19 1:44 PM, John Roesler wrote:
> Hi Adam,
> 
> Hah! Yeah, I felt a headache coming on myself when I realized this
> would be a concern.
> 
> For what it's worth, I'd also lean toward versioning. It seems more
> explicit and more likely to keep us all sane in the long run. Since we
> don't _think_ our wire protocol will be subject to a lot of revisions,
> we can just use one byte. The worst case is that we run out of numbers
> and reserve the last one to mean, "consult another field for the
> actual version number". It seems like a single byte on each message
> isn't too much to pay.
> 
> Since you point it out, we might as well put a version number on the
> SubscriptionResponseWrapper as well. It may not be needed, but if we
> ever need it, even just once, we'll be glad we have it.
> 
> Regarding the instructions field, we can also serialize the enum very
> compactly as a single byte (which is the same size a boolean takes
> anyway), so it seems like an Enum in Java-land and a byte on the wire
> is a good choice.
> 
> Agreed on the right and full outer joins, it doesn't seem necessary
> right now, although I am happy to see the left join "join" the party,
> since as you said, we were so close to it anyway. Can you also add it
> to the KIP?
> 
> Thanks as always for your awesome efforts on this,
> -John
> 
> On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare  
> wrote:
>>
>> You're stretching my brain, John!
>>
>> I prefer STRATEGY 1 because it solves the problem in a simple way, and
>> allows us to deprecate support for older message types as we go (ie, we
>> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).
>>
>> STRATEGY 2 is akin to Avro schemas between two microservices - there are
>> indeed cases where a breaking change must be made, and forward
>> compatibility will provide us with no out other than requiring a full stop
>> and full upgrade for all nodes, shifting us back towards STRATEGY 1.
>>
>> My preference is STRATEGY 1 with instructions as an ENUM, and we can
>> certainly include a version. Would it make sense to include a version
>> number in  SubscriptionResponseWrapper as well? Currently we don't have any
>> instructions in there, as I removed the boolean, but it is certainly
>> plausible that it could happen in the future. I don't *think* we'll need
>> it, but I also didn't think we'd need it for SubscriptionWrapper and here
>> we are.
>>
>> Thanks for the thoughts, and the info on the right-key. That was
>> enlightening, though I can't think of a use-case for it *at this point in
>> time*. :)
>>
>> Adam
>>
>>
>>
>> On Thu, Jun 27, 2019 at 12:29 PM John Roesler  wrote:
>>
>>> I think I agree with you, right joins (and therefore full outer joins)
>>> don't make sense here, because the result is a keyed table, where the
>>> key is the PK of the left-hand side. So, when you have a
>>> right-hand-side record with no incoming FK references, you would want
>>> to produce a join result like `nullKey: (null, rhsValue)`, but we
>>> don't currently allow null keys in Streams. It actually is possible to
>>> define them, and therefore to add right- and full-outer foreign-key
>>> joins later, but it's non-trivial in a streaming context with
>>> continuously updated results. (See the PS if you're curious what I'm
>>> thinking). You're correct, right- and full-outer joins are trivial on
>>> our current 1:1 table joins because they are equi-joins.
>>>
>>> Regarding the transition, it sounds like what you're proposing is that
>>> we would say, "adding a foreign-key join to your topology requires a
>>> full application reset (or a new application id)". This is also an
>>> acceptable constraint to place on the feature, but not strictly
>>> necessary. Since 2.3, it's now possible to give all the state in your
>>> application stable names. This means that it's no longer true that
>>> adding a node to your topology graph would break its structure, and it
>>> does become possible to add new operators and simply restart the app.
>>> Revisiting my prior thought, though, I think the problem is not
>>> specific to your feature. For example, adding a new grouped
>>> aggregation would produce a new 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-27 Thread John Roesler
Hi Adam,

Hah! Yeah, I felt a headache coming on myself when I realized this
would be a concern.

For what it's worth, I'd also lean toward versioning. It seems more
explicit and more likely to keep us all sane in the long run. Since we
don't _think_ our wire protocol will be subject to a lot of revisions,
we can just use one byte. The worst case is that we run out of numbers
and reserve the last one to mean, "consult another field for the
actual version number". It seems like a single byte on each message
isn't too much to pay.

Since you point it out, we might as well put a version number on the
SubscriptionResponseWrapper as well. It may not be needed, but if we
ever need it, even just once, we'll be glad we have it.

Regarding the instructions field, we can also serialize the enum very
compactly as a single byte (which is the same size a boolean takes
anyway), so it seems like an Enum in Java-land and a byte on the wire
is a good choice.

Agreed on the right and full outer joins, it doesn't seem necessary
right now, although I am happy to see the left join "join" the party,
since as you said, we were so close to it anyway. Can you also add it
to the KIP?

Thanks as always for your awesome efforts on this,
-John

On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare  wrote:
>
> You're stretching my brain, John!
>
> I prefer STRATEGY 1 because it solves the problem in a simple way, and
> allows us to deprecate support for older message types as we go (ie, we
> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).
>
> STRATEGY 2 is akin to Avro schemas between two microservices - there are
> indeed cases where a breaking change must be made, and forward
> compatibility will provide us with no out other than requiring a full stop
> and full upgrade for all nodes, shifting us back towards STRATEGY 1.
>
> My preference is STRATEGY 1 with instructions as an ENUM, and we can
> certainly include a version. Would it make sense to include a version
> number in  SubscriptionResponseWrapper as well? Currently we don't have any
> instructions in there, as I removed the boolean, but it is certainly
> plausible that it could happen in the future. I don't *think* we'll need
> it, but I also didn't think we'd need it for SubscriptionWrapper and here
> we are.
>
> Thanks for the thoughts, and the info on the right-key. That was
> enlightening, though I can't think of a use-case for it *at this point in
> time*. :)
>
> Adam
>
>
>
> On Thu, Jun 27, 2019 at 12:29 PM John Roesler  wrote:
>
> > I think I agree with you, right joins (and therefore full outer joins)
> > don't make sense here, because the result is a keyed table, where the
> > key is the PK of the left-hand side. So, when you have a
> > right-hand-side record with no incoming FK references, you would want
> > to produce a join result like `nullKey: (null, rhsValue)`, but we
> > don't currently allow null keys in Streams. It actually is possible to
> > define them, and therefore to add right- and full-outer foreign-key
> > joins later, but it's non-trivial in a streaming context with
> > continuously updated results. (See the PS if you're curious what I'm
> > thinking). You're correct, right- and full-outer joins are trivial on
> > our current 1:1 table joins because they are equi-joins.
> >
> > Regarding the transition, it sounds like what you're proposing is that
> > we would say, "adding a foreign-key join to your topology requires a
> > full application reset (or a new application id)". This is also an
> > acceptable constraint to place on the feature, but not strictly
> > necessary. Since 2.3, it's now possible to give all the state in your
> > application stable names. This means that it's no longer true that
> > adding a node to your topology graph would break its structure, and it
> > does become possible to add new operators and simply restart the app.
> > Revisiting my prior thought, though, I think the problem is not
> > specific to your feature. For example, adding a new grouped
> > aggregation would produce a new repartition topic, but the repartition
> > topic partitions might get assigned to old nodes in the middle of a
> > rolling bounce, and they would need to just ignore them. This
> > requirement is the same for the repartition topics in the FK join, so
> > it's orthogonal to your design.
> >
> > Back to the first concern, though, I'm not sure I followed the
> > explanation. As a thought experiment, let's imagine that Joe hadn't
> > taken the time to experiment with your feature branch. We wouldn't
> > have noticed the problem until the feature was already released in
> > 2.4. So the wire protocol on that PK->FK subscription topic would have
> > been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
> > problem once they picked up the feature, so we would want to implement
> > your proposed fix and change the wire protocol to V2:
> > "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
> > would see both 2.4 

Re: [DISCUSS] KIP-479: Add Materialized to Join

2019-06-27 Thread Matthias J. Sax
Thanks for the KIP Bill!

Great discussion to far.

About John's idea about querying upstream stores and don't materialize a
store: I agree with Bill that this seems to be an orthogonal question,
and it might be better to treat it as an independent optimization and
exclude from this KIP.

> What should be the behavior if there is no store
> configured (e.g., if Materialized with only serdes) and querying is
> enabled?

IMHO, this could be an error case. If one wants to query a store, they
need to provide a name -- if you don't know the name, how would you
actually query the store (even if it would be possible to get the name
from the `TopologyDescription`, it seems clumsy).

If we don't want to throw an error, materializing seems to be the right
option, to exclude "query optimization" from this KIP. I would be ok
with this option, even if it's clumsy to get the name from
`TopologyDescription`; hence, I would prefer to treat it as an error.

> To get back to the current behavior, users would have to
> add a "bytes store supplier" to the Materialized to indicate that,
> yes, they really want a state store there.

This sound like a quite subtle semantic difference on how to use the
API. Might be hard to explain to users. I would prefer to not introduce it.



About Guozhang's points:

1a) That is actually a good point. However, I believe we cannot get
around this issue easily, and it seems ok to me, to expose the actual
store type we are using. (More thoughts later.)

1b) I don't see an issue with allowing users to query all stores? What
is the rational behind it? What do we gain by not allowing it?

2) While I understand what you are saying, we also want/need to have a
way in the PAPI to allow users adding "internal/private" non-queryable
stores to a topology. That's possible via
`Materialized#withQueryingDisabled()`. We could also update
`Topology#addStateStore(StoreBuilder, boolean isQueryable, String...)`
to address this. Again, I agree with Bill that the current API is built
in a certain way, and if we want to change it, it should be a separate
KIP, as it seems to be an orthogonal concern.

> Instead, we just restrict KIP-307 to NOT
> use the Joined.name for state store names and always use internal names as
> well, which admittedly indeed leaves a hole of not being able to cover all
> internal names here

I think it's important to close this gap. Naming entities seems to a
binary feature: if there is a gap, the feature is more or less useless,
rendering KIP-307 void.



I like John's detailed list of required features and what
Materialized/WindowByteStoreSuppliers offers. My take is, that adding
Materialized including the required run-time checks is the best option
we have, for the following reasons:

 - the main purpose of this KIP is to close the naming gap what we achieve
 - we can allow people to use the new in-memory store
 - we allow people to enable/disable caching
 - we unify the API
 - we decouple querying from naming
 - it's a small API change

Adding an overload and only passing in a name, would address the main
purpose of the KIP. However, it falls short on all the other "goodies".
As you mentioned, passing in `Materialized` might not be perfect and
maybe we need to deprecate is at some point; but this is also true for
passing in just a name.

I am also not convinced, that a `StreamJoinStore` would resolve all the
issues. In the end, as long as we are using a `WindowedStore`
internally, we need to expose this "implemenation detail" to users to
allow them to plug in a custom store. Adding `Materialized` seem to be
the best short-term fix from my point of view.


-Matthias


On 6/27/19 9:56 AM, Guozhang Wang wrote:
> Hi John,
> 
> I actually feels better about a new interface but I'm not sure if we would
> need the full configuration of store / log / cache, now or in the future
> ever for stream-stream join.
> 
> Right now I feel that 1) we want to improve our implementation of
> stream-stream join, and potentially also allow users to customize this
> implementation but with a more suitable interface than the current
> WindowStore interface, how to do that is less clear and execution-wise it's
> (arguably..) not urgent; 2) we want to close the last gap (Stream-stream
> join) of allowing users to specify all internal names to help on backward
> compatibility, which is urgent.
> 
> Therefore if we want to unblock 2) from 1) in the near term, I feel
> slightly inclined to just add overload functions that takes in a store name
> for stream-stream joins only -- and admittedly, in the future this function
> maybe deprecated -- i.e. if we have to do something that we "may regret" in
> the future, I'd like to pick the least intrusive option.
> 
> About `Joined#withStoreName`: since the Joined class itself is also used in
> other join types, I feel less comfortable to have a `Joined#withStoreName`
> which is only going to be used by stream-stream join. Or maybe I miss
> something here about the 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-27 Thread Adam Bellemare
You're stretching my brain, John!

I prefer STRATEGY 1 because it solves the problem in a simple way, and
allows us to deprecate support for older message types as we go (ie, we
only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).

STRATEGY 2 is akin to Avro schemas between two microservices - there are
indeed cases where a breaking change must be made, and forward
compatibility will provide us with no out other than requiring a full stop
and full upgrade for all nodes, shifting us back towards STRATEGY 1.

My preference is STRATEGY 1 with instructions as an ENUM, and we can
certainly include a version. Would it make sense to include a version
number in  SubscriptionResponseWrapper as well? Currently we don't have any
instructions in there, as I removed the boolean, but it is certainly
plausible that it could happen in the future. I don't *think* we'll need
it, but I also didn't think we'd need it for SubscriptionWrapper and here
we are.

Thanks for the thoughts, and the info on the right-key. That was
enlightening, though I can't think of a use-case for it *at this point in
time*. :)

Adam



On Thu, Jun 27, 2019 at 12:29 PM John Roesler  wrote:

> I think I agree with you, right joins (and therefore full outer joins)
> don't make sense here, because the result is a keyed table, where the
> key is the PK of the left-hand side. So, when you have a
> right-hand-side record with no incoming FK references, you would want
> to produce a join result like `nullKey: (null, rhsValue)`, but we
> don't currently allow null keys in Streams. It actually is possible to
> define them, and therefore to add right- and full-outer foreign-key
> joins later, but it's non-trivial in a streaming context with
> continuously updated results. (See the PS if you're curious what I'm
> thinking). You're correct, right- and full-outer joins are trivial on
> our current 1:1 table joins because they are equi-joins.
>
> Regarding the transition, it sounds like what you're proposing is that
> we would say, "adding a foreign-key join to your topology requires a
> full application reset (or a new application id)". This is also an
> acceptable constraint to place on the feature, but not strictly
> necessary. Since 2.3, it's now possible to give all the state in your
> application stable names. This means that it's no longer true that
> adding a node to your topology graph would break its structure, and it
> does become possible to add new operators and simply restart the app.
> Revisiting my prior thought, though, I think the problem is not
> specific to your feature. For example, adding a new grouped
> aggregation would produce a new repartition topic, but the repartition
> topic partitions might get assigned to old nodes in the middle of a
> rolling bounce, and they would need to just ignore them. This
> requirement is the same for the repartition topics in the FK join, so
> it's orthogonal to your design.
>
> Back to the first concern, though, I'm not sure I followed the
> explanation. As a thought experiment, let's imagine that Joe hadn't
> taken the time to experiment with your feature branch. We wouldn't
> have noticed the problem until the feature was already released in
> 2.4. So the wire protocol on that PK->FK subscription topic would have
> been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
> problem once they picked up the feature, so we would want to implement
> your proposed fix and change the wire protocol to V2:
> "FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
> would see both 2.4 nodes encountering V2 messages and 2.5 nodes
> encountering V1 messages. How can they both detect that they are
> attempting to process a newer or older protocol? If they can detect
> it, then what should they do?
>
> From experience, there are two basic solutions to this problem:
>
> STRATEGY1. Add a protocol version to the message (could be a number at
> the start of the message payload, or it could be a number in the
> message headers, not sure if it matters much. Payload is probably more
> compact, since the header would need a name.) In this case, the 2.4
> worker would know that it's max protocol version is V1, and when it
> sees the V2 message, it knows that it can't handle it properly. Rather
> than doing something wrong, it would just not do anything. This means
> it would stop the task, if not shut down the whole instance. On the
> other hand, a 2.5 worker would have some defined logic for how to
> handle all versions (V1 and V2), so once the upgrade is complete, all
> messages can be processed.
>
> STRATEGY2. Make the schema forward-compatible. Basically, we ensure
> that new fields can only be appended to the message schema, and that
> older workers using only a prefix of the full message would still
> behave correctly. Using the example above, we'd instead evolve the
> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
> the boolean field to true for the "new" 

Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Justine Olshan
Moving the previous comment to the PR discussion. :)

On Thu, Jun 27, 2019 at 10:51 AM Justine Olshan 
wrote:

> I was going through fixing some of the overloaded methods and I realized I
> overloaded the RecordAccumulator constructor. I added a parameter to
> include the partitioner so I can call my method. However, the tests for the
> record accumulator do not have a partitioner. There is the potential for a
> NPE when calling this method. Currently, none of the tests will enter the
> code block, but I was wondering if would be a good idea to include a
> partitioner != null in the if statement as well. I'm open to other
> suggestions if this is not clear about what is going on.
>
> Ismael,
> Oh I see now. It seems like Netflix just checks if the leader is available
> as well.
> I'll look into the case where no replica is down.
>
>
>
> On Thu, Jun 27, 2019 at 10:39 AM Ismael Juma  wrote:
>
>> Hey Justine.
>>
>> Available could mean that some replicas are down but the leader is
>> available. The suggestion was to try a partition where no replica was down
>> if it's available. Such partitions are safer in general. There could be
>> some downsides too, so worth thinking about the trade-offs.
>>
>> Ismael
>>
>> On Thu, Jun 27, 2019, 10:24 AM Justine Olshan 
>> wrote:
>>
>> > Ismael,
>> >
>> > Thanks for the feedback!
>> >
>> > For 1, currently the sticky partitioner favors "available partitions."
>> From
>> > my understanding, these are partitions that are not under-replicated. If
>> > that is not the same, please let me know.
>> > As for 2, I've switched to Optional, and the few tests I've run so far
>> > suggest the performance is the same.
>> > And for 3, I've added a javadoc to my next commit, so that should be up
>> > soon.
>> >
>> > Thanks,
>> > Justine
>> >
>> > On Thu, Jun 27, 2019 at 1:31 AM Ismael Juma  wrote:
>> >
>> > > Thanks for the KIP Justine. It looks pretty good. A few comments:
>> > >
>> > > 1. Should we favor partitions that are not under replicated? This is
>> > > something that Netflix did too.
>> > >
>> > > 2. If there's no measurable performance difference, I agree with
>> > Stanislav
>> > > that Optional would be better than Integer.
>> > >
>> > > 3. We should include the javadoc for the newly introduced method that
>> > > specifies it and its parameters. In particular, it would good to
>> specify
>> > if
>> > > it gets called when an explicit partition id has been provided.
>> > >
>> > > Ismael
>> > >
>> > > On Mon, Jun 24, 2019, 2:04 PM Justine Olshan 
>> > wrote:
>> > >
>> > > > Hello,
>> > > > This is the discussion thread for KIP-480: Sticky Partitioner.
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
>> > > >
>> > > > Thank you,
>> > > > Justine Olshan
>> > > >
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Justine Olshan
I was going through fixing some of the overloaded methods and I realized I
overloaded the RecordAccumulator constructor. I added a parameter to
include the partitioner so I can call my method. However, the tests for the
record accumulator do not have a partitioner. There is the potential for a
NPE when calling this method. Currently, none of the tests will enter the
code block, but I was wondering if would be a good idea to include a
partitioner != null in the if statement as well. I'm open to other
suggestions if this is not clear about what is going on.

Ismael,
Oh I see now. It seems like Netflix just checks if the leader is available
as well.
I'll look into the case where no replica is down.



On Thu, Jun 27, 2019 at 10:39 AM Ismael Juma  wrote:

> Hey Justine.
>
> Available could mean that some replicas are down but the leader is
> available. The suggestion was to try a partition where no replica was down
> if it's available. Such partitions are safer in general. There could be
> some downsides too, so worth thinking about the trade-offs.
>
> Ismael
>
> On Thu, Jun 27, 2019, 10:24 AM Justine Olshan 
> wrote:
>
> > Ismael,
> >
> > Thanks for the feedback!
> >
> > For 1, currently the sticky partitioner favors "available partitions."
> From
> > my understanding, these are partitions that are not under-replicated. If
> > that is not the same, please let me know.
> > As for 2, I've switched to Optional, and the few tests I've run so far
> > suggest the performance is the same.
> > And for 3, I've added a javadoc to my next commit, so that should be up
> > soon.
> >
> > Thanks,
> > Justine
> >
> > On Thu, Jun 27, 2019 at 1:31 AM Ismael Juma  wrote:
> >
> > > Thanks for the KIP Justine. It looks pretty good. A few comments:
> > >
> > > 1. Should we favor partitions that are not under replicated? This is
> > > something that Netflix did too.
> > >
> > > 2. If there's no measurable performance difference, I agree with
> > Stanislav
> > > that Optional would be better than Integer.
> > >
> > > 3. We should include the javadoc for the newly introduced method that
> > > specifies it and its parameters. In particular, it would good to
> specify
> > if
> > > it gets called when an explicit partition id has been provided.
> > >
> > > Ismael
> > >
> > > On Mon, Jun 24, 2019, 2:04 PM Justine Olshan 
> > wrote:
> > >
> > > > Hello,
> > > > This is the discussion thread for KIP-480: Sticky Partitioner.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> > > >
> > > > Thank you,
> > > > Justine Olshan
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Ismael Juma
Hey Justine.

Available could mean that some replicas are down but the leader is
available. The suggestion was to try a partition where no replica was down
if it's available. Such partitions are safer in general. There could be
some downsides too, so worth thinking about the trade-offs.

Ismael

On Thu, Jun 27, 2019, 10:24 AM Justine Olshan  wrote:

> Ismael,
>
> Thanks for the feedback!
>
> For 1, currently the sticky partitioner favors "available partitions." From
> my understanding, these are partitions that are not under-replicated. If
> that is not the same, please let me know.
> As for 2, I've switched to Optional, and the few tests I've run so far
> suggest the performance is the same.
> And for 3, I've added a javadoc to my next commit, so that should be up
> soon.
>
> Thanks,
> Justine
>
> On Thu, Jun 27, 2019 at 1:31 AM Ismael Juma  wrote:
>
> > Thanks for the KIP Justine. It looks pretty good. A few comments:
> >
> > 1. Should we favor partitions that are not under replicated? This is
> > something that Netflix did too.
> >
> > 2. If there's no measurable performance difference, I agree with
> Stanislav
> > that Optional would be better than Integer.
> >
> > 3. We should include the javadoc for the newly introduced method that
> > specifies it and its parameters. In particular, it would good to specify
> if
> > it gets called when an explicit partition id has been provided.
> >
> > Ismael
> >
> > On Mon, Jun 24, 2019, 2:04 PM Justine Olshan 
> wrote:
> >
> > > Hello,
> > > This is the discussion thread for KIP-480: Sticky Partitioner.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> > >
> > > Thank you,
> > > Justine Olshan
> > >
> >
>


Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Justine Olshan
Ismael,

Thanks for the feedback!

For 1, currently the sticky partitioner favors "available partitions." From
my understanding, these are partitions that are not under-replicated. If
that is not the same, please let me know.
As for 2, I've switched to Optional, and the few tests I've run so far
suggest the performance is the same.
And for 3, I've added a javadoc to my next commit, so that should be up
soon.

Thanks,
Justine

On Thu, Jun 27, 2019 at 1:31 AM Ismael Juma  wrote:

> Thanks for the KIP Justine. It looks pretty good. A few comments:
>
> 1. Should we favor partitions that are not under replicated? This is
> something that Netflix did too.
>
> 2. If there's no measurable performance difference, I agree with Stanislav
> that Optional would be better than Integer.
>
> 3. We should include the javadoc for the newly introduced method that
> specifies it and its parameters. In particular, it would good to specify if
> it gets called when an explicit partition id has been provided.
>
> Ismael
>
> On Mon, Jun 24, 2019, 2:04 PM Justine Olshan  wrote:
>
> > Hello,
> > This is the discussion thread for KIP-480: Sticky Partitioner.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
> >
> > Thank you,
> > Justine Olshan
> >
>


Re: [DISCUSS] KIP-479: Add Materialized to Join

2019-06-27 Thread Guozhang Wang
Hi John,

I actually feels better about a new interface but I'm not sure if we would
need the full configuration of store / log / cache, now or in the future
ever for stream-stream join.

Right now I feel that 1) we want to improve our implementation of
stream-stream join, and potentially also allow users to customize this
implementation but with a more suitable interface than the current
WindowStore interface, how to do that is less clear and execution-wise it's
(arguably..) not urgent; 2) we want to close the last gap (Stream-stream
join) of allowing users to specify all internal names to help on backward
compatibility, which is urgent.

Therefore if we want to unblock 2) from 1) in the near term, I feel
slightly inclined to just add overload functions that takes in a store name
for stream-stream joins only -- and admittedly, in the future this function
maybe deprecated -- i.e. if we have to do something that we "may regret" in
the future, I'd like to pick the least intrusive option.

About `Joined#withStoreName`: since the Joined class itself is also used in
other join types, I feel less comfortable to have a `Joined#withStoreName`
which is only going to be used by stream-stream join. Or maybe I miss
something here about the "latter" case that you are referring to?



Guozhang

On Mon, Jun 24, 2019 at 12:16 PM John Roesler  wrote:

> Thanks Guozhang,
>
> Yep. Maybe we can consider just exactly what the join needs:
>
> > the WindowStore itself is fine, if overly broad,
> > since the only two methods we need are `window.put(key, value,
> > context().timestamp())` and `WindowStoreIterator iter =
> > window.fetch(key, timeFrom, timeTo)`.
>
> One "middle ground" would be to extract _this_ into a new store
> interface, which only supports these API calls, like
> StreamJoinStore. This would give us the latitude we need to
> efficiently support the exact operation without concerning ourselves
> with all the other things a WindowStore can do (which are unreachable
> for the join use case). It would also let us drop "store duplicates"
> from the main WindowStore interface, since it only exists to support
> the join use case.
>
> If we were to add a new StreamJoinStore interface, then it'd be
> straightforward how we could add also
> `Materialized.as(StreamJoinBytesStoreSupplier)` and use Materialized,
> or alternatively add the ability to set the bytes store on Joined.
>
> Personally, I'm kind of leaning toward the latter (and also doing
> `Joined#withStoreName`), since adding the new interface to
> Materialized then also pollutes the interface for its _actual_ use
> case of materializing a table view. Of course, to solve the immediate
> problem, all we need is the store name, but we might feel better about
> adding the store name to Joined if we _also_ feel like in the future,
> we would add store/log/cache configuration to Joined as well.
>
> -John
>
> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang  wrote:
> >
> > Hello John,
> >
> > My main concern is exactly the first point at the bottom of your analysis
> > here: "* configure the bytes store". I'm not sure if using a window bytes
> > store would be ideal for stream-stream windowed join; e.g. we could
> > consider two dimensional list sorted by timestamps and then by keys to do
> > the join, whereas a windowed bytes store is basically sorted by key
> first,
> > then by timestamp. If we expose the Materialized to let user pass in a
> > windowed bytes store, then we would need to change that if we want to
> > replace it with a different implementation interface.
> >
> >
> > Guozhang
> >
> > On Mon, Jun 24, 2019 at 8:59 AM John Roesler  wrote:
> >
> > > Hey Guozhang and Bill,
> > >
> > > For what it's worth, I agree with you both!
> > >
> > > I think it might help the discussion to look concretely at what
> > > Materialized does:
> > > * set a WindowBytesStoreSupplier
> > > * set a name
> > > * set the key/value serdes
> > > * disable/enable/configure change-logging
> > > * disable/enable caching
> > > * configure retention
> > >
> > > Further, looking into the WindowBytesStoreSupplier, the interface lets
> you:
> > > * get the segment interval
> > > * get the window size
> > > * get whether "duplicates" are enabled
> > > * get the retention period
> > > * (obviously) get a WindowStore
> > >
> > > We know that Materialized isn't exactly what we need for stream joins,
> > > but we can see how close Materialized is to what we need. If it is
> > > close, maybe we can use it and document the gaps, and if it is not
> > > close, then maybe we should just add what we need to Joined.
> > > Stream Join's requirements for its stores:
> > > * a multimap store (i.e., it keeps duplicates) for storing general
> > > (not windowed) keyed records associated with their insertion time, and
> > > allows efficient time-bounded lookups and also efficient purges of old
> > > data.
> > > ** Note, a properly configured WindowBytesStoreSupplier satisfies this
> > > requirement, and the 

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-27 Thread John Roesler
I think I agree with you, right joins (and therefore full outer joins)
don't make sense here, because the result is a keyed table, where the
key is the PK of the left-hand side. So, when you have a
right-hand-side record with no incoming FK references, you would want
to produce a join result like `nullKey: (null, rhsValue)`, but we
don't currently allow null keys in Streams. It actually is possible to
define them, and therefore to add right- and full-outer foreign-key
joins later, but it's non-trivial in a streaming context with
continuously updated results. (See the PS if you're curious what I'm
thinking). You're correct, right- and full-outer joins are trivial on
our current 1:1 table joins because they are equi-joins.

Regarding the transition, it sounds like what you're proposing is that
we would say, "adding a foreign-key join to your topology requires a
full application reset (or a new application id)". This is also an
acceptable constraint to place on the feature, but not strictly
necessary. Since 2.3, it's now possible to give all the state in your
application stable names. This means that it's no longer true that
adding a node to your topology graph would break its structure, and it
does become possible to add new operators and simply restart the app.
Revisiting my prior thought, though, I think the problem is not
specific to your feature. For example, adding a new grouped
aggregation would produce a new repartition topic, but the repartition
topic partitions might get assigned to old nodes in the middle of a
rolling bounce, and they would need to just ignore them. This
requirement is the same for the repartition topics in the FK join, so
it's orthogonal to your design.

Back to the first concern, though, I'm not sure I followed the
explanation. As a thought experiment, let's imagine that Joe hadn't
taken the time to experiment with your feature branch. We wouldn't
have noticed the problem until the feature was already released in
2.4. So the wire protocol on that PK->FK subscription topic would have
been V1: "FK,PK,HASH,BOOLEAN". Then, Joe would have let us know the
problem once they picked up the feature, so we would want to implement
your proposed fix and change the wire protocol to V2:
"FK,PK,HASH,INSTRUCTIONS" in 2.5. Upon rolling out the update, we
would see both 2.4 nodes encountering V2 messages and 2.5 nodes
encountering V1 messages. How can they both detect that they are
attempting to process a newer or older protocol? If they can detect
it, then what should they do?

>From experience, there are two basic solutions to this problem:

STRATEGY1. Add a protocol version to the message (could be a number at
the start of the message payload, or it could be a number in the
message headers, not sure if it matters much. Payload is probably more
compact, since the header would need a name.) In this case, the 2.4
worker would know that it's max protocol version is V1, and when it
sees the V2 message, it knows that it can't handle it properly. Rather
than doing something wrong, it would just not do anything. This means
it would stop the task, if not shut down the whole instance. On the
other hand, a 2.5 worker would have some defined logic for how to
handle all versions (V1 and V2), so once the upgrade is complete, all
messages can be processed.

STRATEGY2. Make the schema forward-compatible. Basically, we ensure
that new fields can only be appended to the message schema, and that
older workers using only a prefix of the full message would still
behave correctly. Using the example above, we'd instead evolve the
schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
the boolean field to true for the "new" foreign key. Then, 2.4 workers
encountering the a "new FK" message would just see the prefix of the
payload that makes sense to them, and they would still continue
processing the messages as they always have. Only after the 2.5 code
is fully rolled out to the cluster would we be sure to see the desired
behavior. Note: in the reverse case, a 2.5 worker knows how to fully
parse the new message format, even if it plans to ignore the BOOLEAN
field.

There are some tradeoffs between these strategies: STRATEGY1 ensures
that all messages are only handled by workers that can properly handle
them, although it results in processing stalls while there are still
old nodes in the cluster. STRATEGY2 ensures that all messages can be
processed by all nodes, so there are no stalls, but we can never
remove fields from the message, so if there are a lot of revisions in
the future, the payloads will become bloated. Also, it's not clear
that you can actually pull off STRATEGY2 in all cases. If there's some
new kind of message you want to send that has no way to be correctly
processed at all under the 2.4 code paths, the prefix thing simply
doesn't work. Etc.

Also, note that you can modify the above strategies by instead
designing the message fields for extensibility. E.g., if you make the
instructions 

Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

2019-06-27 Thread Guozhang Wang
Hello folks,

As 2.3 is released now, I'd like to bump up this KIP discussion again for
your reviews.


Guozhang


On Thu, May 23, 2019 at 4:44 PM Guozhang Wang  wrote:

> Hello Patrik,
>
> Since we are rolling out 2.3 and everyone is busy with the release now
> this KIP does not have much discussion involved yet and will slip into the
> next release cadence.
>
> This KIP itself contains several parts itself: 1. refactoring the existing
> metrics hierarchy to cleanup some redundancy and also get more clarity; 2.
> add instance-level metrics like rebalance and state metrics, as well as
> other static metrics.
>
>
> Guozhang
>
>
>
> On Thu, May 23, 2019 at 5:34 AM Patrik Kleindl  wrote:
>
>> Hi Guozhang
>> Thanks for the KIP, this looks very helpful.
>> Could you please provide more detail on the metrics planned for the state?
>> We were just considering how to implement this ourselves because we need
>> to
>> track the history of stage changes.
>> The idea was to have an accumulated "seconds in state x" metric for every
>> state.
>> The new rebalance metric might solve part of our use case, but it is
>> interesting what you have planned for the state metric.
>> best regards
>> Patrik
>>
>> On Fri, 29 Mar 2019 at 18:56, Guozhang Wang  wrote:
>>
>> > Hello folks,
>> >
>> > I'd like to propose the following KIP to improve the Kafka Streams
>> metrics
>> > mechanism to users. This includes 1) a minor change in the public
>> > StreamsMetrics API, and 2) a major cleanup on the Streams' own built-in
>> > metrics hierarchy.
>> >
>> > Details can be found here:
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
>> >
>> > I'd love to hear your thoughts and feedbacks. Thanks!
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-27 Thread Jason Gustafson
>
> We'd remove nodes from targetReplicas just as soon as they entered the
> ISR.  They would become regular replicas at that point.


I think we can save a lot of back and forth by working through an example.
Suppose we have the following initial state:

replicas: [1, 2, 3]
isr: [1, 2, 3]
targetReplicas: []

We want to reassign to [4, 5, 6]. My understanding is that leads to the
following state:

replicas: [1, 2, 3]
isr: [1, 2, 3]
targetReplicas: [4, 5, 6]

Let's say replica 4 comes into the ISR first:

replicas: [1, 2, 3]
isr: [1, 2, 3, 4]
targetReplicas: [4, 5, 6]

What happens now?

(Sorry if I'm being dense, it's just not clear to me exactly what the
expected transitions are from here.)


Thanks,
Jason


On Wed, Jun 26, 2019 at 2:12 PM Colin McCabe  wrote:

> On Wed, Jun 26, 2019, at 12:02, Jason Gustafson wrote:
> > Hi Colin,
> >
> > Responses below and another question:
> >
> > > I guess the thought process here is that most reassignment tools want
> to
> > > know about all the reassignments that are going on.  If you don't know
> all
> > > the pending reassignments, then it's hard to say whether adding a new
> one
> > > is a good idea, or cancelling an existing one.  So I guess I can't
> think of
> > > a case where a reassignment tool would want a partial set rather than
> the
> > > full one.
> >
> > UIs often have "drill into" options. If there is a large ongoing
> > reassignment, I can see wanting to limit the scope to a specific topic.
> Any
> > downside that I'm missing?
> >
>
> We could add a mode that only lists a given set of partitions.  To be
> consistent with how we handle topics, that could be a separate "describe"
> method.  I don't think there's any downside, except some extra code to
> write.
>
> > > Good question.  It will be the current behavior.  Basically, we
> immediately
> > > try to replicate to all the targetReplicas.  As they enter the ISR,
> they
> > > leave "targetReplicas" and enter "replicas."  Making this more
> incremental
> > > would be a good follow-on improvement.
> >
> > The current behavior is to wait until all target replicas are in the ISR
> > before making a change. Are you saying that we will keep this behavior?
>
> We'd remove nodes from targetReplicas just as soon as they entered the
> ISR.  They would become regular replicas at that point.
>
> >
> > > When the /admin/reassign_partitions znode is empty, we'll listen for
> > > updates.  When an update is made, we'll treat it like a call to
> > > AlterPartitionReassignments.  Whenever we have zero pending
> reassignments,
> > > we'll delete the /admin/reassign_partitions znode.  If the znode
> already
> > > exists, we don't listen on it (same as now).
> >
> >
> > So just to be a bit more explicit, what you are saying is that we will
> keep
> > the reassignment state under /admin/reassign_partitions as we do
> currently,
> > but we will update the target_replicas field in /partition/state
> following
> > this new logic. Then as soon as the current replica set matches the
> target
> > assignment, we will remove the /admin/reassign_partitions znode. Right?
>
> One clarification: I wasn't proposing that the controller should write to
> /admin/reassign_partitions.  We will just remove the znode when we
> transition to having no ongoing reassignments.  There's no guarantee that
> what is in the znode reflects the current reassignments that are going on.
> The only thing you can know is that if the znode exists, there is at least
> one reassignment going on.  But if someone makes a new reassignment with
> the AlterPartitionReassignments API, it won't appear in the znode.
>
> Another thing to note is that if the znode exists and you overwrite it,
> your updates will be ignored.  This matches the current behavior of this
> znode.  Apparently some applications don't know about this behavior and try
> to update the znode while a reassignment is going on, but it has no
> effect-- other than making what is in ZK misleading if someone checks.
> This is, again, existing behavior :(
>
> It's not a good API by any means.  For example, what if someone else
> overwrites your znode update before the controller has a chance to read
> it?  Unfortunate, but there's no really good way to fix this without
> transitioning away from direct ZooKeeper access.  We'll transition the
> command line tools immediately, but there will be some external management
> tools that will lag a bit.
>
> >
> > Actually I'm still a bit confused about one aspect of this proposal. You
> > are suggesting that we should leave the reassignment out of the Metadata.
> > That is fine, but what does that mean as far as the consistency of the
> > metadata we expose to clients while a reassignment is active? Currently
> the
> > Metadata includes the following:
> >
> > 1. Current leader
> > 2. Current ISR
> > 3. Current assigned replicas
> >
> > Can you explain how the reassignment will affect this state?  As the
> target
> > replicas are coming into sync, they will 

Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-06-27 Thread Viktor Somogyi-Vass
Hey Colin,

So in my understanding this is how the controller handles a reassignment in
a simple scenario:
1. When an AlterPartitionReassignmentRequest arrives it updates the
partitions' ZK data in
/brokers/topics/[topic]/partitions/[partitionId]/state with targetReplicas
2. Sends out LeaderAndIsr requests to the reassigned partitions to update
their targetReplica set
3. Starts the new replicas
4. Waits until a new replica comes in sync, mark them online
5. Sends out an UpdateMetadata request to the replicas that the new
partitions became ISR
6. Updates the partition state in ZK by removing the new ISR from the
targetReplica set
7. Sends out a LeaderAndIsr request with the targetReplicas not containing
the new ISR. This request may contain a change in leadership info if leader
change is needed
8. If the reassigned replica has to be deleted, then take them offline and
then non-existent
9. Send out an UpdateMetadata request with the new ISR information

Is this correct? Let me know if I missed something, I just wanted to
assemble the picture a little bit. I think it would be useful to add this
(or the corrected version if needed :) ) to the KIP with an example maybe
as Jason suggested. Would help a lot in understandability.

Regards,
Viktor

On Wed, Jun 26, 2019 at 11:12 PM Colin McCabe  wrote:

> On Wed, Jun 26, 2019, at 12:02, Jason Gustafson wrote:
> > Hi Colin,
> >
> > Responses below and another question:
> >
> > > I guess the thought process here is that most reassignment tools want
> to
> > > know about all the reassignments that are going on.  If you don't know
> all
> > > the pending reassignments, then it's hard to say whether adding a new
> one
> > > is a good idea, or cancelling an existing one.  So I guess I can't
> think of
> > > a case where a reassignment tool would want a partial set rather than
> the
> > > full one.
> >
> > UIs often have "drill into" options. If there is a large ongoing
> > reassignment, I can see wanting to limit the scope to a specific topic.
> Any
> > downside that I'm missing?
> >
>
> We could add a mode that only lists a given set of partitions.  To be
> consistent with how we handle topics, that could be a separate "describe"
> method.  I don't think there's any downside, except some extra code to
> write.
>
> > > Good question.  It will be the current behavior.  Basically, we
> immediately
> > > try to replicate to all the targetReplicas.  As they enter the ISR,
> they
> > > leave "targetReplicas" and enter "replicas."  Making this more
> incremental
> > > would be a good follow-on improvement.
> >
> > The current behavior is to wait until all target replicas are in the ISR
> > before making a change. Are you saying that we will keep this behavior?
>
> We'd remove nodes from targetReplicas just as soon as they entered the
> ISR.  They would become regular replicas at that point.
>
> >
> > > When the /admin/reassign_partitions znode is empty, we'll listen for
> > > updates.  When an update is made, we'll treat it like a call to
> > > AlterPartitionReassignments.  Whenever we have zero pending
> reassignments,
> > > we'll delete the /admin/reassign_partitions znode.  If the znode
> already
> > > exists, we don't listen on it (same as now).
> >
> >
> > So just to be a bit more explicit, what you are saying is that we will
> keep
> > the reassignment state under /admin/reassign_partitions as we do
> currently,
> > but we will update the target_replicas field in /partition/state
> following
> > this new logic. Then as soon as the current replica set matches the
> target
> > assignment, we will remove the /admin/reassign_partitions znode. Right?
>
> One clarification: I wasn't proposing that the controller should write to
> /admin/reassign_partitions.  We will just remove the znode when we
> transition to having no ongoing reassignments.  There's no guarantee that
> what is in the znode reflects the current reassignments that are going on.
> The only thing you can know is that if the znode exists, there is at least
> one reassignment going on.  But if someone makes a new reassignment with
> the AlterPartitionReassignments API, it won't appear in the znode.
>
> Another thing to note is that if the znode exists and you overwrite it,
> your updates will be ignored.  This matches the current behavior of this
> znode.  Apparently some applications don't know about this behavior and try
> to update the znode while a reassignment is going on, but it has no
> effect-- other than making what is in ZK misleading if someone checks.
> This is, again, existing behavior :(
>
> It's not a good API by any means.  For example, what if someone else
> overwrites your znode update before the controller has a chance to read
> it?  Unfortunate, but there's no really good way to fix this without
> transitioning away from direct ZooKeeper access.  We'll transition the
> command line tools immediately, but there will be some external management
> tools that will lag a bit.
>
> >
> > Actually I'm 

Re: [DISCUSS] KIP-385: Provide configuration allowing consumer to no throw away prefetched data

2019-06-27 Thread Sean Glover
Hi everyone,

I want to revive a solution to this issue.  I created a new PR that
accomodates Jason Gustafson's suggestion in the original PR to re-add
paused completed fetches back to the completed fetches queue for less
bookeeping.  If someone could jump in and do a review it would be
appreciated!

I also updated KAFKA-7548 with more information.

Patch: https://github.com/apache/kafka/pull/6988 (original:
https://github.com/apache/kafka/pull/5844)
Jira: https://issues.apache.org/jira/browse/KAFKA-7548
Sample project:
https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548
Grafana snapshot:
https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2

Regards,
Sean

On Wed, Oct 31, 2018 at 8:41 PM Zahari Dichev 
wrote:

> Just looked at it,
>
> Great work. Thanks a lot for the patch. This should certainly improve
> things !
>
> Zahari
>
> On Wed, Oct 31, 2018 at 6:25 PM  wrote:
>
> > Hi there, I  will take a look first thing i get home.
> >
> > Zahari
> >
> > > On 31 Oct 2018, at 18:23, Mayuresh Gharat 
> > wrote:
> > >
> > > Hi Colin, Zahari,
> > >
> > > Wanted to check if you can review the patch and let me know, if we need
> > to
> > > make any changes?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Oct 26, 2018 at 1:41 PM Zahari Dichev 
> > > wrote:
> > >
> > >> Thanks for participating the discussion. Indeed, I learned quite a
> lot.
> > >> Will take a look at the patch as well and spend some time hunting for
> > some
> > >> other interesting issue to work on :)
> > >>
> > >> Cheers,
> > >> Zahari
> > >>
> > >>> On Fri, Oct 26, 2018 at 8:49 PM Colin McCabe 
> > wrote:
> > >>>
> > >>> Hi Zahari,
> > >>>
> > >>> I think we can retire the KIP, since the KAFKA-7548 patch should
> solve
> > >> the
> > >>> issue without any changes that require a KIP.  This is actually the
> > best
> > >>> thing we could do for our users, since things will "just work" more
> > >>> efficiently without a lot of configuration knobs.
> > >>>
> > >>> I think you did an excellent job raising this issue and discussing
> it.
> > >>> It's a very good contribution to the project even if you don't end up
> > >>> writing the patch yourself.  I'm going to take a look at the patch
> > today.
> > >>> If you want to take a look, that would also be good.
> > >>>
> > >>> best,
> > >>> Colin
> > >>>
> > >>>
> >  On Thu, Oct 25, 2018, at 12:25, Zahari Dichev wrote:
> >  Hi there Mayuresh,
> > 
> >  Great to heat that this is actually working well in production for
> > some
> >  time now. I have changed the details of the KIP to reflect the fact
> > >> that
> > >>> as
> >  already discussed - we do not really need any kind of configuration
> as
> > >>> this
> >  data should not be thrown away at all.  Submitting a PR sounds
> great,
> >  although I feel a bit jealous you (LinkedIn) beat me to my first
> kafka
> >  commit  ;)  Not sure how things stand with the voting process ?
> > 
> >  Zahari
> > 
> > 
> > 
> >  On Thu, Oct 25, 2018 at 7:39 PM Mayuresh Gharat <
> > >>> gharatmayures...@gmail.com>
> >  wrote:
> > 
> > > Hi Colin/Zahari,
> > >
> > > I have created a ticket for the similar/same feature :
> > > https://issues.apache.org/jira/browse/KAFKA-7548
> > > We (Linkedin) had a use case in Samza at Linkedin when they moved
> > >> from
> > >>> the
> > > SimpleConsumer to KafkaConsumer and they wanted to do this pause
> and
> > >>> resume
> > > pattern.
> > > They realized there was performance degradation when they started
> > >> using
> > > KafkaConsumer.assign() and pausing and unPausing partitions. We
> > >>> realized
> > > that not throwing away the prefetched data for paused partitions
> > >> might
> > > improve the performance. We wrote a benchmark (I can share it if
> > >>> needed) to
> > > prove this. I have attached the findings in the ticket.
> > > We have been running the hotfix internally for quite a while now.
> > >> When
> > > samza ran this fix in production, they realized 30% improvement in
> > >>> there
> > > app performance.
> > > I have the patch ready on our internal branch and would like to
> > >> submit
> > >>> a PR
> > > for this on the above ticket asap.
> > > I am not sure, if we need a separate config for this as we haven't
> > >>> seen a
> > > lot of memory overhead due to this in our systems. We have had this
> > >>> running
> > > in production for a considerable amount of time without any issues.
> > > It would be great if you guys can review the PR once its up and see
> > >> if
> > >>> that
> > > satisfies your requirement. If it doesn't then we can think more on
> > >> the
> > > config driven approach.
> > > Thoughts??
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Thu, Oct 25, 2018 at 8:21 AM Colin McCabe 
> > >>> wrote:
> > >
> > >> Hi 

[DISCUSS] KIP-435: Internal Partition Reassignment Batching

2019-06-27 Thread Viktor Somogyi-Vass
Hi All,

I've renamed my KIP as its name was a bit confusing so we'll continue it in
this thread.
The previous thread for record:
https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E

A short summary of the KIP:
In case of a vast partition reassignment (thousands of partitions at once)
Kafka can collapse under the increased replication traffic. This KIP will
mitigate it by introducing internal batching done by the controller.
Besides putting a bandwidth limit on the replication it is useful to batch
partition movements as fewer number of partitions will use the available
bandwidth for reassignment and they finish faster.
The main control handles are:
- the number of parallel leader movements,
- the number of parallel partition movements
- and the number of parallel replica movements.

Thank you for the feedback and ideas so far in the previous thread and I'm
happy to receive more.

Regards,
Viktor


Re: [DISCUSS] KIP-435: Incremental Partition Reassignment

2019-06-27 Thread Viktor Somogyi-Vass
Hi Colin,

Certainly there will be some interaction and good idea with that you said,
I've added it to my KIP.
Will start a new discussion thread and link this one.

Viktor

On Wed, Jun 26, 2019 at 11:39 PM Colin McCabe  wrote:

> Hi Viktor,
>
> Good point.  Sorry, I should have read the KIP more closely.
>
> It would be good to change the title of the mail thread to reflect the new
> title of the KIP, "Internal Partition Reassignment Batching."
>
> I do think there will be some interaction with KIP-455 here.  One example
> is that we'll want a way of knowing what target replicas are currently
> being worked on.  So maybe we'll have to add a field to the structures
> returned by listPartitionReassignments.
>
> best,
> Colin
>
>
> On Wed, Jun 26, 2019, at 06:20, Viktor Somogyi-Vass wrote:
> > Hey Colin,
> >
> > I think there's some confusion here so I might change the name of this.
> So
> > KIP-435 is about the internal batching of reassignments (so purely a
> > controller change) and not about client side APIs. As per this moment
> these
> > kind of improvements are listed on KIP-455's future work section so in my
> > understanding KIP-455 won't touch that :).
> > Let me know if I'm missing any points here.
> >
> > Viktor
> >
> > On Tue, Jun 25, 2019 at 9:02 PM Colin McCabe  wrote:
> >
> > > Hi Viktor,
> > >
> > > Now that the 2.3 release is over, we're going to be turning our
> attention
> > > back to working on KIP-455, which provides an API for partition
> > > reassignment, and also solves the incremental reassignment problem.
> Sorry
> > > about the pause, but I had to focus on the stuff that was going into
> 2.3.
> > >
> > > I think last time we talked about this, the consensus was that KIP-455
> > > supersedes KIP-435, since KIP-455 supports incremental reassignment.
> We
> > > also don't want to add more technical debt in the form of a new
> > > ZooKeeper-based API that we'll have to support for a while.  So let's
> focus
> > > on KIP-455 here.  We have more resources now so I think we'll be able
> to
> > > get it done soonish.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Jun 25, 2019, at 08:09, Viktor Somogyi-Vass wrote:
> > > > Hi All,
> > > >
> > > > I have added another improvement to this, which is to limit the
> parallel
> > > > leader movements. I think I'll soon (maybe late this week or early
> next)
> > > > start a vote on this too if there are no additional feedback.
> > > >
> > > > Thanks,
> > > > Viktor
> > > >
> > > > On Mon, Apr 29, 2019 at 1:26 PM Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Folks,
> > > > >
> > > > > I've updated the KIP with the batching which would work on both
> replica
> > > > > and partition level. To explain it briefly: for instance if the
> replica
> > > > > level is set to 2 and partition level is set to 3, then 2x3=6
> replica
> > > > > reassignment would be in progress at the same time. In case of
> > > reassignment
> > > > > for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we
> would
> > > > > form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and
> would
> > > > > execute the reassignment in this order.
> > > > >
> > > > > Let me know what you think.
> > > > >
> > > > > Best,
> > > > > Viktor
> > > > >
> > > > > On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass <
> > > > > viktorsomo...@gmail.com> wrote:
> > > > >
> > > > >> A follow up on the batching topic to clarify my points above.
> > > > >>
> > > > >> Generally I think that batching should be a core feature as Colin
> said
> > > > >> the controller should possess all information that are related.
> > > > >> Also Cruise Control (or really any 3rd party admin system) might
> build
> > > > >> upon this to give more holistic approach to balance brokers. We
> may
> > > cater
> > > > >> them with APIs that act like building blocks to make their life
> > > easier like
> > > > >> incrementalization, batching, cancellation and rollback but I
> think
> > > the
> > > > >> more advanced we go we'll need more advanced control surface and
> > > Kafka's
> > > > >> basic tooling might not be suitable for that.
> > > > >>
> > > > >> Best,
> > > > >> Viktor
> > > > >>
> > > > >>
> > > > >> On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <
> > > viktorsomo...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hey Guys,
> > > > >>>
> > > > >>> I'll reply to you all in this email:
> > > > >>>
> > > > >>> @Jun:
> > > > >>> 1. yes, it'd be a good idea to add this feature, I'll write this
> into
> > > > >>> the KIP. I was actually thinking about introducing a dynamic
> config
> > > called
> > > > >>> reassignment.parallel.partition.count and
> > > > >>> reassignment.parallel.replica.count. The first property would
> > > control how
> > > > >>> many partition reassignment can we do concurrently. The second
> would
> > > go one
> > > > >>> level in granularity and would control how many replicas do we
> want
> > > to move
> > > > >>> for a 

[jira] [Created] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO & Cache truncation

2019-06-27 Thread Di Campo (JIRA)
Di Campo created KAFKA-8608:
---

 Summary: Broker shows WARN on reassignment partitions on new 
brokers: Replica LEO & Cache truncation
 Key: KAFKA-8608
 URL: https://issues.apache.org/jira/browse/KAFKA-8608
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.1
 Environment: Kafka 2.1.1

Reporter: Di Campo


I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
there were 32 topics and 64 partitions on each, replication 3.

Running reassigning partitions. 

On each run, I can see the following WARN messages, but when the reassignment 
partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
partition).

But I get the following messages types, one per partition:

 
{code:java}
[2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
entry EpochEntry(epoch=24, startOffset=51540) caused truncation of conflicting 
entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). Cache now contains 
5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
-> This relates to cache, so I suppose it's pretty safe.
{code:java}
[2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
record follower 3's position 47981 since the replica is not recognized to be 
one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
records will be returned for this partition. (kafka.server.ReplicaManager){code}
-> This is scary. I'm not sure about the severity of this, but it looks like it 
may be missing records? 
{code:java}
[2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
(kafka.server.ReplicaManager){code}
-> Here, these partitions are created. 


First of all - am I supposed to be missing data here? I am assuming I don't, 
and so far I don't see traces of losing anything.

If so, I'm not sure what these messages are trying to say here. Should they 
really be at WARN level? If so - what are they warning about? 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-412: Extend Admin API to support dynamic application log levels

2019-06-27 Thread Stanislav Kozlovski
Hey there everybody,

We're moving along. I wanted to come back with some updates we came up with
while iterating on the PR:
- Removed the ALL log level - all the rest log levels are quite standard
but this is not that expected. Simpler is always better :)
- We have changed the DescribeConfigs API and the Log4jController MBean's
getter methods to return the ROOT log level for a logger which does not
have its log level explicitly set. This is more user-friendly

Changes are reflected in the KIP

Thanks,
Stanislav

On Wed, Jun 12, 2019 at 1:34 PM Stanislav Kozlovski 
wrote:

> Hello,
>
> I finished work on an initial PR for this KIP, it is ready for review here
> - https://github.com/apache/kafka/pull/6903
>
> During implementation, I noticed a couple of holes in the KIP and did the
> following changes:
> * add LogLevelConfig public class and a few more log levels (7 total - ALL,
> TRACE, DEBUG, INFO, WARN, ERROR, FATAL)
> * add a new ConfigResource - DYNAMIC_BROKER_LOGGER_CONFIG
> * DELETE operations in the IncrementalAlterConfigRequest will change the
> logger's level to the ROOT logger's currently-set level. (we previously
> thought we could return to the default - log4j does not seem to allow us to
> do that)
>
> Feel free to re-read the KIP (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels)
>  or
> go through its page history
>
> Thanks,
> Stanislav
>
> On Thu, Feb 21, 2019 at 12:17 PM Stanislav Kozlovski <
> stanis...@confluent.io> wrote:
>
>> Thanks for the interest everybody. This KIP has passed with 3 binding
>> votes (Harsha, Gwen, Rajini) and  5 non-binding votes (Mickael, Jonathan,
>> Dongjin, Satish, Andrew)
>>
>> On Thu, Feb 21, 2019 at 9:00 AM Satish Duggana 
>> wrote:
>>
>>> Thanks for the KIP
>>> +1 (non-binding)
>>>
>>> On Thu, Feb 21, 2019 at 9:38 AM Harsha  wrote:
>>> >
>>> > +1 (binding).
>>> >
>>> > Thanks,
>>> > Harsha
>>> >
>>> > On Tue, Feb 19, 2019, at 7:53 AM, Andrew Schofield wrote:
>>> > > Thanks for the KIP.
>>> > >
>>> > > +1 (non-binding)
>>> > >
>>> > > On 18/02/2019, 12:48, "Stanislav Kozlovski" 
>>> wrote:
>>> > >
>>> > > Hey everybody, I'm starting a VOTE thread for KIP-412. This
>>> feature should
>>> > > significantly improve the flexibility and ease in debugging
>>> Kafka in run
>>> > > time
>>> > >
>>> > > KIP-412 -
>>> > >
>>> > >
>>> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-412%253A%2BExtend%2BAdmin%2BAPI%2Bto%2Bsupport%2Bdynamic%2Bapplication%2Blog%2Blevelsdata=02%7C01%7C%7C69bc63a9d7864e25ec3c08d69596eec4%7C84df9e7fe9f640afb435%7C1%7C0%7C636860872825557120sdata=XAnMhy6EPC7JkB77NBBhLR%2FvE7XrTutuS5Rlt%2FDpwfU%3Dreserved=0
>>> > >
>>> > >
>>> > > --
>>> > > Best,
>>> > > Stanislav
>>> > >
>>> > >
>>> > >
>>>
>>
>>
>> --
>> Best,
>> Stanislav
>>
>
>
> --
> Best,
> Stanislav
>


-- 
Best,
Stanislav


Re: [DISCUSS] KIP-373: Allow users to create delegation tokens for other users

2019-06-27 Thread Viktor Somogyi-Vass
Hi Folks,

I took over this issue from Manikumar. Recently another motivation have
been raised in Spark for this (SPARK-28173) and I think it'd be great to
continue this task.
I updated the KIP and will wait for a few days to get some feedback then
proceed for the vote.

Thanks,
Viktor

On Tue, Dec 11, 2018 at 8:29 AM Manikumar  wrote:

> Hi Harsha,
>
> Thanks for the review.
>
> With this KIP a designated superuser can create tokens without requiring
> individual user credentials.
> Any client can authenticate brokers using the created tokens. We may not
> call this as impersonation,
> since the clients API calls are executing on their own authenticated
> connections.
>
> Thanks,
> Manikumar
>
> On Fri, Dec 7, 2018 at 11:56 PM Harsha  wrote:
>
> > Hi Mani,
> >  Overall KIP looks good to me. Can we call this Impersonation
> > support, which is what the KIP is doing?
> > Also instead of using super.uses as the config which essentially giving
> > cluster-wide support to the users, we can introduce impersonation.users
> as
> > a config and users listed in the config are allowed to impersonate other
> > users.
> >
> > Thanks,
> > Harsha
> >
> >
> > On Fri, Dec 7, 2018, at 3:58 AM, Manikumar wrote:
> > > Bump up! to get some attention.
> > >
> > > BTW, recently Apache Spark added  support for Kafka delegation token.
> > > https://issues.apache.org/jira/browse/SPARK-25501
> > >
> > > On Fri, Dec 7, 2018 at 5:27 PM Manikumar 
> > wrote:
> > >
> > > > Bump up! to get some attention.
> > > >
> > > > BTW, recently Apache Spark added for Kafka delegation token support.
> > > > https://issues.apache.org/jira/browse/SPARK-25501
> > > >
> > > > On Tue, Sep 25, 2018 at 9:56 PM Manikumar  >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I have created a KIP that proposes to allow users to create
> delegation
> > > >> tokens for other users.
> > > >>
> > > >>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users
> > > >>
> > > >> Please take a look when you get a chance.
> > > >>
> > > >> Thanks,
> > > >> Manikumar
> > > >>
> > > >
> >
>


[jira] [Created] (KAFKA-8607) Reduce AdminClient Metadata request rate when invalid node id is given

2019-06-27 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-8607:
--

 Summary: Reduce AdminClient Metadata request rate when invalid 
node id is given
 Key: KAFKA-8607
 URL: https://issues.apache.org/jira/browse/KAFKA-8607
 Project: Kafka
  Issue Type: Task
Affects Versions: 2.3.0, 2.2.0, 2.1.0, 2.0.0
Reporter: Stanislav Kozlovski


While testing KAFKA-7800 (KIP-412), we were playing around with the config 
command CLI and [noticed that it hangs for very 
long|https://github.com/apache/kafka/pull/6903#discussion_r297434016] when 
given an invalid broker id.

After investigating a bit more, I noticed that we endlessly retry metadata 
updates. Locally, my AdminClient issued 78 requests for 10 seconds - averaging 
at a rate of 7.8 requests/sec. The call times out after 2 minutes by default - 
we end up sending 1149 requests.



This respects the "retry.backoff.ms" config  but it may be better to have some 
sort of exponential backoff to ease the needless load on the cluster.
It is unlikely for this to be a high-impact change but it sounds worth it to 
have the protection. Orchestration systems like Kubernetes make it easier for a 
user to mass-deploy a wrong config and inadvertenly DDoS his cluster via 
metadata requests



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-480 : Sticky Partitioner

2019-06-27 Thread Ismael Juma
Thanks for the KIP Justine. It looks pretty good. A few comments:

1. Should we favor partitions that are not under replicated? This is
something that Netflix did too.

2. If there's no measurable performance difference, I agree with Stanislav
that Optional would be better than Integer.

3. We should include the javadoc for the newly introduced method that
specifies it and its parameters. In particular, it would good to specify if
it gets called when an explicit partition id has been provided.

Ismael

On Mon, Jun 24, 2019, 2:04 PM Justine Olshan  wrote:

> Hello,
> This is the discussion thread for KIP-480: Sticky Partitioner.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
>
> Thank you,
> Justine Olshan
>


[jira] [Created] (KAFKA-8606) Provide a method to fetch committed offsets for a collection of TopicPartition

2019-06-27 Thread ov7a (JIRA)
ov7a created KAFKA-8606:
---

 Summary: Provide a method to fetch committed offsets for a 
collection of TopicPartition
 Key: KAFKA-8606
 URL: https://issues.apache.org/jira/browse/KAFKA-8606
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Affects Versions: 2.2.1, 2.3.0
Reporter: ov7a


Currently KafkaConsumer has methods for fetching begging offsets, end offsets 
and offsets for times, all of them accepting a collection of TopicPartition.

There is a method to fetch committed offset for single TopicPartition, but 
there is no public API to fetch commited offsets for a collection of 
TopicPartition. So, If one wants to fetch all committed offsets for topic, a 
request per partition is created.

Note that ConsumerCoordinator.fetchCommittedOffsets which called internally in 
"committed" method does accept a collection of TopicPartition. 





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-470: TopologyTestDriver test input and output usability improvements

2019-06-27 Thread Jukka Karvanen
Hi,

>>(4) Should we switch from `long` for timestamps to `Instant` and
`Duration` ?
>This version startTimestamp is Instant and autoAdvance Duration in
Initialization and with manual configured collection pipe methods.
>Now timestamp of TestRecord is still Long and similarly single record
pipeInput still has long as parameter.
>Should these also converted to to Instant type?
>Should there be both long and Instant parallel?
>I expect there are existing test codebase which would be easier to migrate
if long could be still used.
Now added Instant version to TestRecord and pipeInput method.

Checking my own tests I was able to migrate the most of my code with
search without further thinking about the logic to this new
approach. The result of converting millis to Instant directly generates
rather non readable test code and changing from long to Instant correctly
require understand what is the case it is testing.

That is why version with long left still as deprecated for easier migration
for existing test.
Also TopologyTestDriver constructor and advanceWallClockTime  method
modified with same approach.

Jukka


ma 24. kesäk. 2019 klo 16.47 Bill Bejeck (bbej...@gmail.com) kirjoitti:

> Hi Jukka
>
> > These topic objects are only interfacing TopologyTestDriver, not
> affecting
> > the internal functionality of it. In my plan the internal data structures
> > are using those Producer/ConsumerRecords as earlier. That way I don't see
> > how those could be affected.
>
> I mistakenly thought the KIP was proposing to completely remove
> Producer/ConsumerRecords in favor of TestRecord.  But taking another quick
> look I can see the plan for using the TestRecord objects.  Thanks for the
> clarification.
>
> -Bill
>
> On Sat, Jun 22, 2019 at 2:26 AM Jukka Karvanen <
> jukka.karva...@jukinimi.com>
> wrote:
>
> > Hi All,
> > Hi Matthias,
> >
> > >(1) It's a little confusing that you list all method (existing, proposed
> > >to deprecate, and new one) of `TopologyTestDriver` in the KIP. Maybe
> > >only list the ones you propose to deprecate and the new ones you want to
> > >add?
> >
> > Ok. Unmodified methods removed.
> >
> > >(2) `TopologyTestDriver#createInputTopic`: might it be worth to add
> > >overload to initialize the timetamp and auto-advance feature directly?
> > >Otherwise, uses always need to call `configureTiming` as an extra call?
> >
> > Added with Instant and Duration parameters.
> >
> > >(3) `TestInputTopic#configureTiming()`: maybe rename to
> > `reconfigureTiming()` ?
> >
> > I removed this method when we have now initialization in constructor.
> > You can recreate TestInputTopic if needing to reconfigure timing.
> >
> >
> > >(4) Should we switch from `long` for timestamps to `Instant` and
> > `Duration` ?
> > This version startTimestamp is Instant and autoAdvance Duration in
> > Initialization and with manual configured collection pipe methods.
> > Now timestamp of TestRecord is still Long and similarly single record
> > pipeInput still has long as parameter.
> > Should these also converted to to Instant type?
> > Should there be both long and Instant parallel?
> > I expect there are existing test codebase which would be easier to
> migrate
> > if long could be still used.
> >
> > Also should advanceWallClockTime  in TopologyTestDriver changed(or added
> > alternative) for Duration parameter.
> >
> >
> > > (5) Why do we have redundant getters? Or set with `getX()` and one
> > set without `get`-prefix?
> >
> > The methods without get-prefix are for compatibility with
> ProducerRecord /
> > ConsumerRecord and I expect would make migration to TestRecord easier.
> > Standard getters in TestRecord enable writing test ignoring selected
> fields
> > with hamcrest like this:
> >
> > assertThat(outputTopic.readRecord(), allOf(
> > hasProperty("key", equalTo(1L)),
> > hasProperty("value", equalTo("Hello")),
> > hasProperty("headers", equalTo(headers;
> >
> >
> > That's why I have currently both methods.
> >
> > Jukka
> >
> >
> > pe 21. kesäk. 2019 klo 22.20 Matthias J. Sax (matth...@confluent.io)
> > kirjoitti:
> >
> > > Thanks for the KIP. The idea to add InputTopic and OutputTopic
> > > abstractions is really neat!
> > >
> > >
> > > Couple of minor comment:
> > >
> > > (1) It's a little confusing that you list all method (existing,
> proposed
> > > to deprecate, and new one) of `TopologyTestDriver` in the KIP. Maybe
> > > only list the ones you propose to deprecate and the new ones you want
> to
> > > add?
> > >
> > > (Or mark all existing methods clearly -- atm, I need to got back to the
> > > code to read the KIP and to extract what changes are proposed).
> > >
> > >
> > > (2) `TopologyTestDriver#createInputTopic`: might it be worth to add
> > > overload to initialize the timetamp and auto-advance feature directly?
> > > Otherwise, uses always need to call `configureTiming` as an extra call?
> > >
> > >
> > > (3) `TestInputTopic#configureTiming()`: maybe rename to
> > >