Re: [VOTE] KIP-345: Introduce static membership protocol to reduce consumer rebalances

2018-12-10 Thread Boyang Chen
Thanks Stanislav!

Get Outlook for iOS


From: Stanislav Kozlovski 
Sent: Monday, December 10, 2018 11:28 PM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to reduce 
consumer rebalances

This is great work, Boyang. Thank you very much.

+1 (non-binding)

On Mon, Dec 10, 2018 at 6:09 PM Boyang Chen  wrote:

> Hey there, could I get more votes on this thread?
>
> Thanks for the vote from Mayuresh and Mike :)
>
> Best,
> Boyang
> 
> From: Mayuresh Gharat 
> Sent: Thursday, December 6, 2018 10:53 AM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to
> reduce consumer rebalances
>
> +1 (non-binding)
>
> Thanks,
>
> Mayuresh
>
> On Tue, Dec 4, 2018 at 6:58 AM Mike Freyberger 
> wrote:
>
> > +1 (non binding)
> >
> > On 12/4/18, 9:43 AM, "Patrick Williams"  >
> > wrote:
> >
> > Pls take me off this VOTE list
> >
> > Best,
> >
> > Patrick Williams
> >
> > Sales Manager, UK & Ireland, Nordics & Israel
> > StorageOS
> > +44 (0)7549 676279
> > patrick.willi...@storageos.com
> >
> > 20 Midtown
> > 20 Proctor Street
> > Holborn
> > London WC1V 6NX
> >
> > Twitter: @patch37
> > LinkedIn: linkedin.com/in/patrickwilliams4 <
> >
> https://nam02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinkedin.com%2Fin%2Fpatrickwilliams4data=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=ipDTX%2FGARrFkwZfRuOY0M5m3iJ%2Bnkxovv6u9bBDaTyc%3Dreserved=0
> >
> >
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslack.storageos.com%2Fdata=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=hxuKU6aZdQU%2FpxpqaaThR6IjpEmwIP5%2F3NhYzMYijkw%3Dreserved=0
> >
> >
> >
> > On 03/12/2018, 17:34, "Guozhang Wang"  wrote:
> >
> > Hello Boyang,
> >
> > I've browsed through the new wiki and there are still a couple of
> > minor
> > things to notice:
> >
> > 1. RemoveMemberFromGroupOptions seems not defined anywhere.
> >
> > 2. LeaveGroupRequest added a list of group instance id, but still
> > keep the
> > member id as a singleton; is that intentional? I think to make
> the
> > protocol
> > consistent both member id and instance ids could be plural.
> >
> > 3. About the *kafka-remove-member-from-group.sh *tool, I'm
> > wondering if we
> > can defer adding this while just add the corresponding calls of
> the
> > LeaveGroupRequest inside Streams until we have used it in
> > production and
> > hence have a better understanding on how flexible or extensible
> if
> > we want
> > to add any cmd tools. The rationale is that if we do not
> > necessarily need
> > it now, we can always add it later with a more think-through API
> > design,
> > but if we add the tool in a rush, we may need to extend or modify
> > it soon
> > after we realize its limits in operations.
> >
> > Otherwise, I'm +1 on the proposal.
> >
> > Guozhang
> >
> >
> > On Mon, Dec 3, 2018 at 9:14 AM Boyang Chen 
> > wrote:
> >
> > > Hey community friends,
> > >
> > > after another month of polishing, KIP-345<
> > >
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-345%253A%2BIntroduce%2Bstatic%2Bmembership%2Bprotocol%2Bto%2Breduce%2Bconsumer%2Brebalancesdata=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=T4i7L1i0nIeHrrjTeLOOgYKsfzfNEMGDhTazvBEZbXw%3Dreserved=0
> > >
> > > design is ready for vote. Feel free to add your comment on the
> > discussion
> > > thread or here.
> > >
> > > Thanks for your time!
> > >
> > > Boyang
> > > 
> > > From: Boyang Chen 
> > > Sent: Friday, November 9, 2018 6:35 AM
> > > To: dev@kafka.apache.org
> > > Subject: [VOTE] KIP-345: Introduce static membership protocol
> to
> > reduce
> > > consumer rebalances
> > >
> > > Hey all,
> > >
> > >
> > > thanks so much for all the inputs on KIP-345 so far. The
> > original proposal
> > > has enhanced a lot with your help. To make sure the
> > implementation go
> > > smoothly without back and forth, I would like to start a vote
> on
> > the final
> > > design agreement now:
> > >
> > >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-data=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=g4%2BMXKpkiQLZXg5HJWfJhw1kc1PbDNwyiX9zkREVqGE%3Dreserved=0
> <
> > >
> >
> 

Re: [DISCUSS] KIP-373: Allow users to create delegation tokens for other users

2018-12-10 Thread Manikumar
Hi Harsha,

Thanks for the review.

With this KIP a designated superuser can create tokens without requiring
individual user credentials.
Any client can authenticate brokers using the created tokens. We may not
call this as impersonation,
since the clients API calls are executing on their own authenticated
connections.

Thanks,
Manikumar

On Fri, Dec 7, 2018 at 11:56 PM Harsha  wrote:

> Hi Mani,
>  Overall KIP looks good to me. Can we call this Impersonation
> support, which is what the KIP is doing?
> Also instead of using super.uses as the config which essentially giving
> cluster-wide support to the users, we can introduce impersonation.users as
> a config and users listed in the config are allowed to impersonate other
> users.
>
> Thanks,
> Harsha
>
>
> On Fri, Dec 7, 2018, at 3:58 AM, Manikumar wrote:
> > Bump up! to get some attention.
> >
> > BTW, recently Apache Spark added  support for Kafka delegation token.
> > https://issues.apache.org/jira/browse/SPARK-25501
> >
> > On Fri, Dec 7, 2018 at 5:27 PM Manikumar 
> wrote:
> >
> > > Bump up! to get some attention.
> > >
> > > BTW, recently Apache Spark added for Kafka delegation token support.
> > > https://issues.apache.org/jira/browse/SPARK-25501
> > >
> > > On Tue, Sep 25, 2018 at 9:56 PM Manikumar 
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have created a KIP that proposes to allow users to create delegation
> > >> tokens for other users.
> > >>
> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users
> > >>
> > >> Please take a look when you get a chance.
> > >>
> > >> Thanks,
> > >> Manikumar
> > >>
> > >
>


Re: [VOTE] KIP-345: Introduce static membership protocol to reduce consumer rebalances

2018-12-10 Thread Stanislav Kozlovski
This is great work, Boyang. Thank you very much.

+1 (non-binding)

On Mon, Dec 10, 2018 at 6:09 PM Boyang Chen  wrote:

> Hey there, could I get more votes on this thread?
>
> Thanks for the vote from Mayuresh and Mike :)
>
> Best,
> Boyang
> 
> From: Mayuresh Gharat 
> Sent: Thursday, December 6, 2018 10:53 AM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to
> reduce consumer rebalances
>
> +1 (non-binding)
>
> Thanks,
>
> Mayuresh
>
> On Tue, Dec 4, 2018 at 6:58 AM Mike Freyberger 
> wrote:
>
> > +1 (non binding)
> >
> > On 12/4/18, 9:43 AM, "Patrick Williams"  >
> > wrote:
> >
> > Pls take me off this VOTE list
> >
> > Best,
> >
> > Patrick Williams
> >
> > Sales Manager, UK & Ireland, Nordics & Israel
> > StorageOS
> > +44 (0)7549 676279
> > patrick.willi...@storageos.com
> >
> > 20 Midtown
> > 20 Proctor Street
> > Holborn
> > London WC1V 6NX
> >
> > Twitter: @patch37
> > LinkedIn: linkedin.com/in/patrickwilliams4 <
> >
> https://nam02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinkedin.com%2Fin%2Fpatrickwilliams4data=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=QIZ7s9HoutiaKs4bAg68oNsUDZ9ertfwlHd%2FRWKRFOg%3Dreserved=0
> >
> >
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslack.storageos.com%2Fdata=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=lthlUwYKvWgxquV%2FJE%2FQF9pFYrMYPV1QK72I1mu8E%2BY%3Dreserved=0
> >
> >
> >
> > On 03/12/2018, 17:34, "Guozhang Wang"  wrote:
> >
> > Hello Boyang,
> >
> > I've browsed through the new wiki and there are still a couple of
> > minor
> > things to notice:
> >
> > 1. RemoveMemberFromGroupOptions seems not defined anywhere.
> >
> > 2. LeaveGroupRequest added a list of group instance id, but still
> > keep the
> > member id as a singleton; is that intentional? I think to make
> the
> > protocol
> > consistent both member id and instance ids could be plural.
> >
> > 3. About the *kafka-remove-member-from-group.sh *tool, I'm
> > wondering if we
> > can defer adding this while just add the corresponding calls of
> the
> > LeaveGroupRequest inside Streams until we have used it in
> > production and
> > hence have a better understanding on how flexible or extensible
> if
> > we want
> > to add any cmd tools. The rationale is that if we do not
> > necessarily need
> > it now, we can always add it later with a more think-through API
> > design,
> > but if we add the tool in a rush, we may need to extend or modify
> > it soon
> > after we realize its limits in operations.
> >
> > Otherwise, I'm +1 on the proposal.
> >
> > Guozhang
> >
> >
> > On Mon, Dec 3, 2018 at 9:14 AM Boyang Chen 
> > wrote:
> >
> > > Hey community friends,
> > >
> > > after another month of polishing, KIP-345<
> > >
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-345%253A%2BIntroduce%2Bstatic%2Bmembership%2Bprotocol%2Bto%2Breduce%2Bconsumer%2Brebalancesdata=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=L0X1z8hE%2FebB0KGbUWttz4lvsy%2FkcB49MRc8KZd8I0Y%3Dreserved=0
> > >
> > > design is ready for vote. Feel free to add your comment on the
> > discussion
> > > thread or here.
> > >
> > > Thanks for your time!
> > >
> > > Boyang
> > > 
> > > From: Boyang Chen 
> > > Sent: Friday, November 9, 2018 6:35 AM
> > > To: dev@kafka.apache.org
> > > Subject: [VOTE] KIP-345: Introduce static membership protocol
> to
> > reduce
> > > consumer rebalances
> > >
> > > Hey all,
> > >
> > >
> > > thanks so much for all the inputs on KIP-345 so far. The
> > original proposal
> > > has enhanced a lot with your help. To make sure the
> > implementation go
> > > smoothly without back and forth, I would like to start a vote
> on
> > the final
> > > design agreement now:
> > >
> > >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-data=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=Ld0DpCbOmH0Gmu%2FVfkRS5lWA0vBcgi9WmHDvYz4L3b8%3Dreserved=0
> <
> > >
> >
> 

Re: [DISCUSS] KIP-395: Encypt-then-MAC Delegation token metadata

2018-12-10 Thread Manikumar
Hi,

Thanks for the KIP.

Currently, master/secret key is stored as plain text in server.properties
config file.
Using master secret key as shared secret is again a security risk. We have
raised KAFKA-7694
to implement a ZooKeeper based master/secret key management to automate
secret key rotation.

As you mentioned in the alternatives sections, it is good to have pluggable
mechanism for
token storage and master key generation. We can implement pluggable
interfaces for token storage
and master key generation as part of KAFKA-7694. This will provide us out
of the box implementation
using ZooKeeper and pluggable interfaces for custom implementations.

What do you think?

Thanks,
Manikumar

On Sat, Dec 1, 2018 at 9:37 PM Attila Sasvári  wrote:

> Hi All,
>
> I have a proposal to allow Kafka brokers to encrypt sensitive metadata
> information about delegation tokens.
>
> As of now, delegation token metadata is stored in an unencrypted format in
> Zookeeper. Having the possibility to encrypt-then-MAC token information
> would be beneficial in Kafka installations where Zookeeper is not on a
> private network.
>
> Please take a look at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-395%3A+Encypt-then-MAC+Delegation+token+metadata
> and let me know what you think.
>
> - Attila
>


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-12-10 Thread Dong Lin
Hey Ryanne,

Thanks much for the KIP!

Though I don't have time to review this KIP in detail at this stage, I
think this KIP will be very useful to Apache Kafka users (particularly
global enterprise users) who need geo replication capability. Currently
Kafka users have to setup and manage MM clusters for geo replication which
has many problems as described in the motivation section of this KIP. The
fact that many companies have built in-house projects to simplify Kafka geo
replication management shows that need for better built-in geo replication
support in Apache Kafka. It will require a lot of design discussion and
work in the community. It will be really great to see progress in this KIP.

Thanks!
Dong

On Mon, Oct 15, 2018 at 9:17 AM Ryanne Dolan  wrote:

> Hey y'all!
>
> Please take a look at KIP-382:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
>
> Thanks for your feedback and support.
>
> Ryanne
>


Re: Vote for KIP-393 (Fix time windowed serde to deserialize changelog topic)

2018-12-10 Thread Shawn Nguyen
Thanks for the feedback Guozhang! I updated the KIP.

In the meantime, could I ask for additional binding votes/approval on this
KIP proposal?

Shawn

On Thu, Dec 6, 2018 at 1:22 PM Liquan Pei  wrote:

> +1 (non-binding)
>
> On Wed, Dec 5, 2018 at 4:51 PM Guozhang Wang  wrote:
>
>> Hello Shawn,
>>
>> Thanks for the writeup. I've made a pass over it and here are some minor
>> comments:
>>
>> 1) As we discussed in the PR: https://github.com/apache/kafka/pull/5307,
>> the public APIs that we will add is
>>
>> In WindowedSerdes:
>> ```
>> static public  Serde> timeWindowedChangelogSerdeFrom(final
>> Class type, final long windowSize)
>> ```
>>
>> In TimeWindowedSerde
>> ```
>> TimeWindowedSerde forChangelog(final boolean);
>> ```
>>
>> Other classes such as WindowedKeySchema are internal classes for
>> implementation details and hence do not need to be listed in the wiki as
>> public APIs.
>>
>>
>> 2) The wiki doc may reads a bit confusing for audience who are not
>> familiar
>> with the PR, since we mentioned the "forChangelog()" function and the
>> "isChangelog" parameter without clear definitions, but only explained what
>> it is later in the docs as java code examples. I think rephrasing the
>> early
>> paragraphs to explain a bit more why we will add a new internal field
>> along
>> with a setter, its semantics (its default value and how deserialization
>> will be different depending on that) would be better.
>>
>> Otherwise, I'm +1 on the KIP, thanks!
>>
>>
>> Guozhang
>>
>>
>> On Wed, Dec 5, 2018 at 8:18 AM Shawn Nguyen 
>> wrote:
>>
>> > Hey all,
>> >
>> > I wanted to start a vote on approval of KIP-393
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic
>> > >
>> > to
>> > fix the current time windowed serde for properly deserializing changelog
>> > input topics. Let me know what you guys think.
>> >
>> > Thanks,
>> > Shawn
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>


Build failed in Jenkins: kafka-2.1-jdk8 #79

2018-12-10 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7610; Proactively timeout new group members if rebalance is

--
[...truncated 912.63 KB...]

kafka.zk.ReassignPartitionsZNodeTest > testDecodeInvalidJson STARTED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeInvalidJson PASSED

kafka.zk.ReassignPartitionsZNodeTest > testEncode STARTED

kafka.zk.ReassignPartitionsZNodeTest > testEncode PASSED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson STARTED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson PASSED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange STARTED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testLogDirGetters STARTED

kafka.zk.KafkaZkClientTest > testLogDirGetters PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr STARTED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr PASSED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress STARTED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters PASSED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetLogConfigs STARTED

kafka.zk.KafkaZkClientTest > testGetLogConfigs PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-10 Thread Paul Whalen
Ah yes of course, this was an oversight, I completely ignored the multiple
processors sharing the same state store when writing up the KIP.  Which is
funny, because I've actually done this (different processors sharing state
stores) a fair amount myself, and I've settled on a pattern where I group
the Processors in an enclosing class, and that enclosing class handles as
much as possible.  Here's a gist showing the rough structure, just for
context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
. Note how it adds the stores to the topology, as well as providing a
public method with the store names.

I don't think my proposal completely conflicts with the multiple processors
sharing state stores use case, since you can create a supplier that
provides the store name you want, somewhat independently of your actual
Processor logic.  The issue I do see though, is that
topology.addStateStore() can only be called once for a given store.  So for
your example, if the there was a single TransformerSupplier that was passed
into both transform() calls, "store1" would be added (under the hood) to
the topology twice, which is no good.

Perhaps this suggests that one of my alternatives on the KIP might be
desirable: either not having the suppliers return StoreBuilders (just store
names), or not deprecating the old methods that take "String...
stateStoreNames". I'll have to think about it a bit.

Paul

On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang  wrote:

> Hello Paul,
>
> Thanks for the great writeup (very detailed and crystal motivation
> sections!).
>
> This is quite an interesting idea and I do like the API cleanness you
> proposed. The original motivation of letting StreamsTopology to add state
> stores though, is to allow different processors to share the state store.
> For example:
>
> builder.addStore("store1");
>
> // a path of stream transformations that leads to KStream stream1.
> stream1.transform(..., "store1");
>
> // another path that generates a KStream stream2.
> stream2.transform(..., "store1");
>
> Behind the scene, Streams will make sure stream1 / stream2 transformations
> will always be grouped together as a single group of tasks, each of which
> will be executed by a single thread and hence there's no concurrency issues
> on accessing the store from different operators within the same task. I'm
> not sure how common this use case is, but I'd like to hear if you have any
> thoughts maintaining this since the current proposal seems exclude this
> possibility.
>
>
> Guozhang
>
>
> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:
>
> > Here's KIP-401 for discussion, a minor Kafka Streams API change that I
> > think could greatly increase the usability of the low-level processor
> API.
> > I have some code written but will wait to see if there is buy in before
> > going all out and creating a pull request.  It seems like most of the
> work
> > would be in updating documentation and tests.
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >
> > Thanks!
> > Paul
> >
>
>
> --
> -- Guozhang
>


Jenkins build is back to normal : kafka-trunk-jdk8 #3253

2018-12-10 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk11 #150

2018-12-10 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7610; Proactively timeout new group members if rebalance is

--
[...truncated 2.24 MB...]

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testReplaceVariableWithTTL PASSED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testReplaceVariableWithTTLAndScheduleRestart STARTED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testReplaceVariableWithTTLAndScheduleRestart PASSED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testReplaceVariableWithTTLFirstCancelThenScheduleRestart STARTED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testReplaceVariableWithTTLFirstCancelThenScheduleRestart PASSED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testTransformNullConfiguration STARTED

org.apache.kafka.connect.runtime.WorkerConfigTransformerTest > 
testTransformNullConfiguration PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testPollsInBackground STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testPollsInBackground PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommit STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommit PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitFailure 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitConsumerFailure PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testRewind PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testRewindOnRebalanceDuringPoll STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testRewindOnRebalanceDuringPoll PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-10 Thread Guozhang Wang
I had one meta comment on the PR:
https://github.com/apache/kafka/pull/5909#discussion_r240447153

On Mon, Dec 10, 2018 at 5:22 PM John Roesler  wrote:

> Hi Florian,
>
> I hope it's ok if I ask a few questions at this late stage...
>
> Comment 1 ==
>
> It seems like the proposal is to add a new "Named" interface that is
> intended to be mixed in with the existing API objects at various points.
>
> Just to preface some of my comments, it looks like your KIP was created
> quite a while ago, so the API may have changed somewhat since you started.
>
> As I see the API, there are a few different kinds of DSL method arguments:
> * functions: things like Initializer, Aggregator, ValueJoiner,
> ForEachAction... All of these are essentially Streams-flavored Function
> interfaces with different arities, type bounds, and semantics.
> * config objects: things like Produced, Consumed, Joined, Grouped... These
> are containers for configurations, where the target of the configuration is
> the operation itself
> * raw configurations: things like a raw topic-name string and Materialized:
> These are configurations for operations that have no config object, and for
> various reasons, we didn't make one. The distinguishing feature is that the
> target of the configuration is not the operation itself, but some aspect of
> it. For example, in Materialized, we are not setting the caching behavior
> of, for example, an aggregation; we're setting the caching behavior of a
> materialized state store attached to the aggregation.
>
> It seems like choosing to mix the Named interface in with the functions has
> a couple of unfortunate side-effects:
> * Aggregator is not the only function passed to any of the relevant
> aggregate methods, so it seems a little arbitrary to pick that function
> over Initializer or Merger.
> * As you noted, branch() takes an array of Predicate, so we just ignore the
> provided name(s), even though Predicate names are used elsewhere.
> * Not all things that we want to name have function arguments, notably
> source and sink, so we'd switch paradigms and use the config object
> instead.
> * Adding an extra method to the function interfaces means that those are no
> longer SAM interfaces. You proposed to add a default implementation, so we
> could still pass a lambda if we don't want to set the name, but if we *do*
> want to set the name, we can no longer use lambdas.
>
> I think the obvious other choice would be to mix Named in with the config
> objects instead, but this has one main downside of its own...
> * not every operator we wish to name has a config object. I don't know if
> everyone involved is comfortable with adding a config object to every
> operator that's missing one.
>
> Personally, I favor moving toward a more consistent state that's forward
> compatible with any further changes we wish to make. I *think* that giving
> every operator two forms (one with no config and one with a config object)
> would be such an API.
>
> Comment 2 =
>
> Finally, just a minor comment: the static method in Named wouldn't work
> properly as defined. Assuming that we mix Named in with Produced, for
> example, we'd need to be able to use it like:
> >  kStream.to("out", Produced.with("myOut"))
> This doesn't work because with() returns a Named, but we need a Produced.
>
> We can pull off a builder method in the interface, but not a static method.
> To define a builder method in the interface that returns an instance of the
> concrete subtype, you have to use the "curiously recurring generic"
> pattern.
>
> It would look like:
>
> public interface Named> {
>   String name();
>   N withName(String name);
> }
>
> You can see where the name of the pattern comes from ;)
> An implementation would then look like:
>
> public class Produced implements Named {
>   String name() { return name; }
>   Produced withName(final String name) { this.name = name; return this; }
> }
>
> Note that the generic parameter gets filled in properly in the implementing
> class, so that you get the right return type out.
>
> It doesn't work at all with a static factory method at the interface level,
> so it would be up to Produced to define a static factory if it wants to
> present one.
>
> ==
>
> Those are my two feedbacks!
>
> I hope you find this helpful, rather than frustrating. I'm sorry I didn't
> get a chance to comment sooner.
>
> Thanks for the KIP, I think it will be much nicer to be able to name the
> processor nodes.
>
> -John
>
> On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang  wrote:
>
> > Hi Florian,
> >
> > I've made a pass over the PR. There are some comments that are related to
> > the function names which may be affecting the KIP wiki page, but overall
> I
> > think it looks good already.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang 
> wrote:
> >
> > > Thanks Florian! I will take a look at the PR.
> > >
> > >
> > >
> > > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-10 Thread John Roesler
Hi Florian,

I hope it's ok if I ask a few questions at this late stage...

Comment 1 ==

It seems like the proposal is to add a new "Named" interface that is
intended to be mixed in with the existing API objects at various points.

Just to preface some of my comments, it looks like your KIP was created
quite a while ago, so the API may have changed somewhat since you started.

As I see the API, there are a few different kinds of DSL method arguments:
* functions: things like Initializer, Aggregator, ValueJoiner,
ForEachAction... All of these are essentially Streams-flavored Function
interfaces with different arities, type bounds, and semantics.
* config objects: things like Produced, Consumed, Joined, Grouped... These
are containers for configurations, where the target of the configuration is
the operation itself
* raw configurations: things like a raw topic-name string and Materialized:
These are configurations for operations that have no config object, and for
various reasons, we didn't make one. The distinguishing feature is that the
target of the configuration is not the operation itself, but some aspect of
it. For example, in Materialized, we are not setting the caching behavior
of, for example, an aggregation; we're setting the caching behavior of a
materialized state store attached to the aggregation.

It seems like choosing to mix the Named interface in with the functions has
a couple of unfortunate side-effects:
* Aggregator is not the only function passed to any of the relevant
aggregate methods, so it seems a little arbitrary to pick that function
over Initializer or Merger.
* As you noted, branch() takes an array of Predicate, so we just ignore the
provided name(s), even though Predicate names are used elsewhere.
* Not all things that we want to name have function arguments, notably
source and sink, so we'd switch paradigms and use the config object instead.
* Adding an extra method to the function interfaces means that those are no
longer SAM interfaces. You proposed to add a default implementation, so we
could still pass a lambda if we don't want to set the name, but if we *do*
want to set the name, we can no longer use lambdas.

I think the obvious other choice would be to mix Named in with the config
objects instead, but this has one main downside of its own...
* not every operator we wish to name has a config object. I don't know if
everyone involved is comfortable with adding a config object to every
operator that's missing one.

Personally, I favor moving toward a more consistent state that's forward
compatible with any further changes we wish to make. I *think* that giving
every operator two forms (one with no config and one with a config object)
would be such an API.

Comment 2 =

Finally, just a minor comment: the static method in Named wouldn't work
properly as defined. Assuming that we mix Named in with Produced, for
example, we'd need to be able to use it like:
>  kStream.to("out", Produced.with("myOut"))
This doesn't work because with() returns a Named, but we need a Produced.

We can pull off a builder method in the interface, but not a static method.
To define a builder method in the interface that returns an instance of the
concrete subtype, you have to use the "curiously recurring generic" pattern.

It would look like:

public interface Named> {
  String name();
  N withName(String name);
}

You can see where the name of the pattern comes from ;)
An implementation would then look like:

public class Produced implements Named {
  String name() { return name; }
  Produced withName(final String name) { this.name = name; return this; }
}

Note that the generic parameter gets filled in properly in the implementing
class, so that you get the right return type out.

It doesn't work at all with a static factory method at the interface level,
so it would be up to Produced to define a static factory if it wants to
present one.

==

Those are my two feedbacks!

I hope you find this helpful, rather than frustrating. I'm sorry I didn't
get a chance to comment sooner.

Thanks for the KIP, I think it will be much nicer to be able to name the
processor nodes.

-John

On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang  wrote:

> Hi Florian,
>
> I've made a pass over the PR. There are some comments that are related to
> the function names which may be affecting the KIP wiki page, but overall I
> think it looks good already.
>
>
> Guozhang
>
>
> On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang  wrote:
>
> > Thanks Florian! I will take a look at the PR.
> >
> >
> >
> > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Hi Matthias,
> >>
> >> Sorry I was absent for a while. I have started a new PR for this KIP. It
> >> is
> >> still in progress for now. I'm working on it.
> >> https://github.com/apache/kafka/pull/5909
> >>
> >> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
> >> écrit :
> >>
> >> > What is the status of this KIP?
> >> >
> 

[jira] [Resolved] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-12-10 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7610.
--
   Resolution: Fixed
 Assignee: Jason Gustafson  (was: Boyang Chen)
Fix Version/s: 2.1.1
   2.2.0

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-12-10 Thread Adam Bellemare
1) I believe that the resolution mechanism John has proposed is sufficient
- it is clean and easy and doesn't require additional RocksDB stores, which
reduces the footprint greatly. I don't think we need to resolve based on
timestamp or offset anymore, but if we decide to do to that would be within
the bounds of the existing API.

2) Is the current API sufficient, or does it need to be altered to go back
to vote?

3) KScatteredTable implementation can always be added in a future revision.
This API does not rule it out. This implementation of this function would
simply be replaced with `KScatteredTable.resolve()` while still maintaining
the existing API, thereby giving both features as Jan outlined earlier.
Would this work?


Thanks Guozhang, John and Jan




On Mon, Dec 10, 2018 at 10:39 AM John Roesler  wrote:

> Hi, all,
>
> >> In fact, we
> >> can just keep a single final-result store with timestamps and reject
> values
> >> that have a smaller timestamp, is that right?
>
> > Which is the correct output should at least be decided on the offset of
> > the original message.
>
> Thanks for this point, Jan.
>
> KIP-258 is merely to allow embedding the record timestamp  in the k/v
> store,
> as well as providing a storage-format upgrade path.
>
> I might have missed it, but I think we have yet to discuss whether it's
> safe
> or desirable just to swap topic-ordering our for timestamp-ordering. This
> is
> a very deep topic, and I think it would only pollute the current
> discussion.
>
> What Adam has proposed is safe, given the *current* ordering semantics
> of the system. If we can agree on his proposal, I think we can merge the
> feature well before the conversation about timestamp ordering even takes
> place, much less reaches a conclusion. In the mean time, it would seem to
> be unfortunate to have one join operator with different ordering semantics
> from every other KTable operator.
>
> If and when that timestamp discussion takes place, many (all?) KTable
> operations
> will need to be updated, rendering the many:one join a small marginal cost.
>
> And, just to plug it again, I proposed an algorithm above that I believe
> provides
> correct ordering without any additional metadata, and regardless of the
> ordering semantics. I didn't bring it up further, because I felt the KIP
> only needs
> to agree on the public API, and we can discuss the implementation at
> leisure in
> a PR...
>
> Thanks,
> -John
>
>
> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak 
> wrote:
>
> >
> >
> > On 10.12.2018 07:42, Guozhang Wang wrote:
> > > Hello Adam / Jan / John,
> > >
> > > Sorry for being late on this thread! I've finally got some time this
> > > weekend to cleanup a load of tasks on my queue (actually I've also
> > realized
> > > there are a bunch of other things I need to enqueue while cleaning them
> > up
> > > --- sth I need to improve on my side). So here are my thoughts:
> > >
> > > Regarding the APIs: I like the current written API in the KIP. More
> > > generally I'd prefer to keep the 1) one-to-many join functionalities as
> > > well as 2) other join types than inner as separate KIPs since 1) may
> > worth
> > > a general API refactoring that can benefit not only foreignkey joins
> but
> > > collocate joins as well (e.g. an extended proposal of
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > ),
> > > and I'm not sure if other join types would actually be needed (maybe
> left
> > > join still makes sense), so it's better to
> wait-for-people-to-ask-and-add
> > > than add-sth-that-no-one-uses.
> > >
> > > Regarding whether we enforce step 3) / 4) v.s. introducing a
> > > KScatteredTable for users to inject their own optimization: I'd prefer
> to
> > > do the current option as-is, and my main rationale is for optimization
> > > rooms inside the Streams internals and the API succinctness. For
> advanced
> > > users who may indeed prefer KScatteredTable and do their own
> > optimization,
> > > while it is too much of the work to use Processor API directly, I think
> > we
> > > can still extend the current API to support it in the future if it
> > becomes
> > > necessary.
> >
> > no internal optimization potential. it's a myth
> >
> > ¯\_(ツ)_/¯
> >
> > :-)
> >
> > >
> > > Another note about step 4) resolving out-of-ordering data, as I
> mentioned
> > > before I think with KIP-258 (embedded timestamp with key-value store)
> we
> > > can actually make this step simpler than the current proposal. In fact,
> > we
> > > can just keep a single final-result store with timestamps and reject
> > values
> > > that have a smaller timestamp, is that right?
> >
> > Which is the correct output should at least be decided on the offset of
> > the original message.
> >
> > >
> > >
> > > That's all I have in mind now. Again, great appreciation to Adam to
> make
> > > such HUGE progress on this KIP!
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak 

[jira] [Created] (KAFKA-7718) Allow customized header inheritance for stateful operators in DSL

2018-12-10 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7718:


 Summary: Allow customized header inheritance for stateful 
operators in DSL
 Key: KAFKA-7718
 URL: https://issues.apache.org/jira/browse/KAFKA-7718
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


As a follow-up work of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
 we want to provide allow users to customize how record headers are inherited 
while traversing the topology at the DSL layer (at the lower-level Processor 
API layer, users are already capable for customizing and inheriting the headers 
as they forward the records to next processor nodes).

Today the headers are implicitly inherited throughout the topology without any 
modifications within the Streams library. For stateless operators (filter, map, 
etc) this default inheritance policy should be sufficient. For stateful 
operators where multiple input records may be generating a single record (i.e. 
it is an n:1 transformations rather than 1:1 mapping), since we only inherit 
from the triggering record, which would seem to be a "random" choice to the 
users and other records' headers are lost.

I'd propose we extend DSL to allow users to customize the headers inheritance 
policy for stateful operators, namely Joins and Aggregations. It would contain 
two parts:

1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control 
object with an additional function that allows users to pass in a lambda 
function (let's say its called HeadersMerger, but name subject to discuss over 
KIP) that takes two Headers object and generated a single Headers object in the 
return value.

2) On the implementation layer, we need to actually store the headers at the 
materialized state store so that they can be retrieved along with the record 
for join / aggregation processor. This would be changing the state store value 
bytes organization and hence better be considered carefully. Then when join / 
aggregate processor is triggered, the Headers of both records will be retrieved 
(one from the triggering record, one read from the materialized state store) 
and then passed to the HeadersMerger. Some low-hanging optimizations can be 
considered though, e.g. if users do not have overridden this interface, then we 
can consider not reading the headers from the other side at all to save IO cost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-12-10 Thread Ryanne Dolan
Jun, thanks for your time reviewing the KIP.

> In a MirrorSourceConnector, it seems that the offsets of the source will
be stored in a different cluster from the target cluster?

Jan Filipiak raised this issue as well, and suggested that no state be
tracked in the source cluster. I've since implemented MirrorSourceConnector
accordingly. And actually, this issue coincides with another major weakness
of legacy MirrorMaker: "rebalance storm". In both cases, the problem is due
to MirrorMaker using high-level consumer groups for replication.

MM2 does not use consumer groups at all, but instead manages its own
partition assignments and offsets. MirrorSourceConnector monitors
topic-partitions and assigns them to MirrorSourceTasks directly -- there
are no high-level subscriptions and therefore no rebalances. Likewise,
MirrorSourceConnector stores its own offsets in the target cluster, so no
state information is lost if the source cluster disappears. Both of these
features are facilitated by the Connect framework and were inspired by
Uber's uReplicator.

> If the single connect cluster model is indeed useful, it seems that we
should support it in the general connect framework since it can be useful
for managing other types connectors.

Sönke Liebau suggested this as well. I've spent some time looking into
this, and I do believe it would be possible to bring these features to
Connect in general without breaking the existing APIs. For example, maybe a
connector config could specify which worker to use as a property like
worker.name=foo, and otherwise a default worker would be used. In this
case, a "MirrorMaker cluster" would just be a Connect cluster with a
pre-configured set of workers.

My plan is to contribute MM2 and then help pull features from MM2 into
Connect. I don't think it would make sense to prime Connect first, nor do I
want to propose a bunch of changes to Connect in this one KIP. If the
concern is primarily around the co-existence of a MM2 REST API and the
nearly identical Connect API, perhaps it would make sense to split off the
"MirrorMaker clusters" section of this KIP into a separate KIP aimed at
Connect in general? Would love to hear your thoughts on this.

> Could you provide a bit more details on the content of the heartbeat
topic?

At present the heartbeat is just a timestamp and the alias of the cluster
of origin. This is more powerful than existing Connector-level metrics, as
these heartbeats are themselves replicated and can be traced across
multiple hops in the replication topology. I'll add this to the KIP.

> Also, if this is useful, should we just add it add in the connect
framework, instead of just mirror maker?

Same deal, I'd love to see this, but I don't think we should try to prime
Connect before adopting MM2.

> RemoteClusterUtils. Since this is part of the public interface, could you
document the public APIs?

Will do, thanks.

> source.cluster.bootstrap.servers/target.cluster.bootstrap.servers: Does a
Source/Sink connect need both?

Sort of. I'm using this to construct an AdminClient for topic ACL and
configuration sync, since the Connect framework doesn't expose it. I intend
to follow-up KIP-382 with a proposal to expose this info to Connectors.
There's also KIP-158, but it deals with topic creation only.

Thanks again for the feedback!

Ryanne



On Fri, Dec 7, 2018 at 6:22 PM Jun Rao  wrote:

> Hi, Ryanne,
>
> Thanks for the KIP. At the high level, this looks like a reasonable
> proposal. A few comments below.
>
> 1. About using a single connector cluster to manage connectors accessing
> multiple Kafka clusters. It's good that you brought this up.  The following
> are the tradeoffs that I see. The benefit of using a single connect cluster
> is that it simplifies the management. There are a couple of potential
> downsides.
> (a) In a MirrorSourceConnector, it seems that the offsets of the source
> will be stored in a different cluster from the target cluster? If the data
> in the target Kafka cluster is lost (say the whole cluster is wiped out),
> one has to manually reset the offset to re-mirror the missing data. (2) If
> the offsets are stored in a separate cluster from the produced data, it
> prevents the connector from running features such as EOS since currently
> EOS doesn't span Kafka clusters. If the single connect cluster model is
> indeed useful, it seems that we should support it in the general connect
> framework since it can be useful for managing other types connectors. This
> could be related to KIP-296 since it allows connector level
> producer/consumer customization.
>
> 2. The heartbeats topic. Could you provide a bit more details on the
> content of the heartbeat topic? I am not sure how that's different from the
> connector level metrics. Also, if this is useful, should we just add it add
> in the connect framework, instead of just mirror maker?
>
> 3. RemoteClusterUtils. Since this is part of the public interface, could
> you document the public APIs?
>

Re: [VOTE] KIP-394: Require member.id for initial join group request

2018-12-10 Thread Boyang Chen
Thanks a lot folks! I will start the implementation right away 

Boyang

From: Harsha 
Sent: Tuesday, December 11, 2018 3:28 AM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-394: Require member.id for initial join group request

+1 . Thanks for the KIP. This is very much needed.

-Harsha

On Mon, Dec 10, 2018, at 11:00 AM, Guozhang Wang wrote:
> +1. Thanks Boyang!
>
>
> Guozhang
>
> On Mon, Dec 10, 2018 at 10:29 AM Jason Gustafson  wrote:
>
> > +1 Thanks for the KIP, Boyang!
> >
> > -Jason
> >
> > On Mon, Dec 10, 2018 at 10:07 AM Boyang Chen  wrote:
> >
> > > Thanks for voting my friends. Could someone give one more binding vote
> > > here?
> > >
> > > Best,
> > > Boyang
> > > 
> > > From: Bill Bejeck 
> > > Sent: Thursday, December 6, 2018 2:45 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-394: Require member.id for initial join group
> > > request
> > >
> > > +1
> > > Thanks for the KIP.
> > >
> > > -Bill
> > >
> > > On Wed, Dec 5, 2018 at 1:43 PM Matthias J. Sax 
> > > wrote:
> > >
> > > > Thanks for the KIP.
> > > >
> > > > +1 (binding)
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 12/5/18 7:53 AM, Mayuresh Gharat wrote:
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > >
> > > > > On Wed, Dec 5, 2018 at 3:59 AM Boyang Chen 
> > > wrote:
> > > > >
> > > > >> Hey friends,
> > > > >>
> > > > >> I would like to start a vote for KIP-394<
> > > > >>
> > > >
> > >
> > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C485191d19df14ea9eb6a08d65ed5abb6%7C84df9e7fe9f640afb435%7C1%7C0%7C636800669128030121sdata=cmkIofm18SaQ1swsNIJUZ%2Bb4EaR1zlDEhgwb3EgfAOY%3Dreserved=0
> > > > >.
> > > > >> The goal of this KIP is to improve broker stability by fencing
> > invalid
> > > > join
> > > > >> group requests.
> > > > >>
> > > > >> Best,
> > > > >> Boyang
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang


Jenkins build is back to normal : kafka-trunk-jdk11 #149

2018-12-10 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-392: Allow consumers to fetch from the closest replica

2018-12-10 Thread Jason Gustafson
Hey Mickael,

Thanks for the comments. Responses below:

- I'm guessing the selector will be invoke after each rebalance so
> every time the consumer is assigned a partition it will be able to
> select it. Is that true?


I'm not sure it is necessary to do it after every rebalance, but certainly
the selector would be invoked after the partition is first assigned. Was
there a specific concern you had in mind?

- From the selector API, I'm not sure how the consumer will be able to
> address some of the choices mentioned in "Finding the preferred
> follower". Especially the available bandwidth and the load balancing.
> By only having the list of Nodes, a consumer can pick the nereast
> replica (assuming the rack field means anything to users) or balance
> its own bandwidth but that might not necessarily mean improved
> performance or a balanced load on the brokers.


The intent is to provide a minimal extension point. Users would have to
rely on external sources for their own custom selection logic. It is
similar to other interfaces exposed in the clients, such as Partitioner and
PartitionAssignor. The interface exposes only metadata about the
replication state, but nothing stops users from leveraging other
information to make better decisions. Does that seem reasonable?

Thanks,
Jason



On Mon, Dec 10, 2018 at 11:41 AM Jason Gustafson  wrote:

> Hey Eno,
>
> Thanks for the comments. However, I'm a bit confused. I'm not suggesting
> we change Produce semantics in any way. All writes still go through the
> partition leader and nothing changes with respect to committing to the ISR.
> The main issue, as I've mentioned in the KIP, is the increased latency
> before a committed offset is exposed on followers.
>
> Perhaps I have misunderstood your question?
>
> Thanks,
> Jason
>
> On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska 
> wrote:
>
>> Hi Jason,
>>
>> This is an interesting KIP. This will have massive implications for
>> consistency and serialization, since currently the leader for a partition
>> serializes requests. A few questions for now:
>>
>> - before we deal with the complexity, it'd be great to see a crisp example
>> in the motivation as to when this will have the most benefit for a
>> customer. In particular, although the customer might have a multi-DC
>> deployment, the DCs could still be close by in a region, so what is the
>> expected best-case scenario for a performance gain? E.g., if all DCs are
>> on
>> the east-cost, say. Right now it's not clear to me.
>> - perhaps performance is not the right metric. Is the metric you are
>> optimizing for latency, throughput or cross-DC cost? (I believe it is
>> cross-DC cost from the KIP). Just wanted to double-check since I'm not
>> sure
>> latency would improve. Throughput could really improve from parallelism
>> (especially in cases when there is mostly consuming going on). So it could
>> be throughput as well.
>> - the proposal would probably lead to choosing a more complex consistency.
>> I tend to like the description Doug Terry has in his paper "Replicated
>> Data
>> Consistency Explained Through Baseball"
>>
>> https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf
>> .
>> To start with, could we get in scenarios where a client that has both a
>> producer and a consumer (e.g., Kafka streams) produces a record, then
>> attempts to consume it back and the consume() comes back with "record does
>> not exist"? That's fine, but could complicate application handling of such
>> scenarios.
>>
>> Thanks,
>> Eno
>>
>> On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison 
>> wrote:
>>
>> > Hi Jason,
>> >
>> > Very cool KIP!
>> > A couple of questions:
>> > - I'm guessing the selector will be invoke after each rebalance so
>> > every time the consumer is assigned a partition it will be able to
>> > select it. Is that true?
>> >
>> > - From the selector API, I'm not sure how the consumer will be able to
>> > address some of the choices mentioned in "Finding the preferred
>> > follower". Especially the available bandwidth and the load balancing.
>> > By only having the list of Nodes, a consumer can pick the nereast
>> > replica (assuming the rack field means anything to users) or balance
>> > its own bandwidth but that might not necessarily mean improved
>> > performance or a balanced load on the brokers.
>> >
>> > Thanks
>> > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski
>> >  wrote:
>> > >
>> > > Hey Jason,
>> > >
>> > > This is certainly a very exciting KIP.
>> > > I assume that no changes will be made to the offset commits and they
>> will
>> > > continue to be sent to the group coordinator?
>> > >
>> > > I also wanted to address metrics - have we considered any changes
>> there?
>> > I
>> > > imagine that it would be valuable for users to be able to
>> differentiate
>> > > between which consumers' partitions are fetched from replicas and
>> which
>> > > aren't. I guess that would need to be addressed both in the 

[jira] [Resolved] (KAFKA-6036) Enable logical materialization to physical materialization

2018-12-10 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6036.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Enable logical materialization to physical materialization
> --
>
> Key: KAFKA-6036
> URL: https://issues.apache.org/jira/browse/KAFKA-6036
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.2.0
>
>
> Today whenever users specify a queryable store name for KTable, we would 
> always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable 
> but only "logically" materialize it when you have some simple transformation 
> operations or even join operations that generated new KTables, and which 
> needs to be materialized with a state store, you can use the changelog topic 
> of the previous KTable and applies the transformation logic upon restoration 
> instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, 
> say {{state2}} to be reading from {{topic1}} and then apply the filter 
> operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when 
> to logically materialize, and whether we should actually allow users of DSL 
> to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7717) Enable security configs in kafka.tools.EndToEndLatency

2018-12-10 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7717:
--

 Summary: Enable security configs in kafka.tools.EndToEndLatency
 Key: KAFKA-7717
 URL: https://issues.apache.org/jira/browse/KAFKA-7717
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski


The [end to end latency 
tool|[http://example.com|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/EndToEndLatency.scala]]
 does not support security configurations for authenticating to a secured 
broker. It only accepts `bootstrap.servers`, rendering it useless against 
SASL-secured clusters



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3252

2018-12-10 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-7549; Old ProduceRequest with zstd compression does not return

--
[...truncated 2.24 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED


Re: [DISCUSS] KIP-392: Allow consumers to fetch from the closest replica

2018-12-10 Thread Jason Gustafson
Hey Eno,

Thanks for the comments. However, I'm a bit confused. I'm not suggesting we
change Produce semantics in any way. All writes still go through the
partition leader and nothing changes with respect to committing to the ISR.
The main issue, as I've mentioned in the KIP, is the increased latency
before a committed offset is exposed on followers.

Perhaps I have misunderstood your question?

Thanks,
Jason

On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska  wrote:

> Hi Jason,
>
> This is an interesting KIP. This will have massive implications for
> consistency and serialization, since currently the leader for a partition
> serializes requests. A few questions for now:
>
> - before we deal with the complexity, it'd be great to see a crisp example
> in the motivation as to when this will have the most benefit for a
> customer. In particular, although the customer might have a multi-DC
> deployment, the DCs could still be close by in a region, so what is the
> expected best-case scenario for a performance gain? E.g., if all DCs are on
> the east-cost, say. Right now it's not clear to me.
> - perhaps performance is not the right metric. Is the metric you are
> optimizing for latency, throughput or cross-DC cost? (I believe it is
> cross-DC cost from the KIP). Just wanted to double-check since I'm not sure
> latency would improve. Throughput could really improve from parallelism
> (especially in cases when there is mostly consuming going on). So it could
> be throughput as well.
> - the proposal would probably lead to choosing a more complex consistency.
> I tend to like the description Doug Terry has in his paper "Replicated Data
> Consistency Explained Through Baseball"
>
> https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf
> .
> To start with, could we get in scenarios where a client that has both a
> producer and a consumer (e.g., Kafka streams) produces a record, then
> attempts to consume it back and the consume() comes back with "record does
> not exist"? That's fine, but could complicate application handling of such
> scenarios.
>
> Thanks,
> Eno
>
> On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison 
> wrote:
>
> > Hi Jason,
> >
> > Very cool KIP!
> > A couple of questions:
> > - I'm guessing the selector will be invoke after each rebalance so
> > every time the consumer is assigned a partition it will be able to
> > select it. Is that true?
> >
> > - From the selector API, I'm not sure how the consumer will be able to
> > address some of the choices mentioned in "Finding the preferred
> > follower". Especially the available bandwidth and the load balancing.
> > By only having the list of Nodes, a consumer can pick the nereast
> > replica (assuming the rack field means anything to users) or balance
> > its own bandwidth but that might not necessarily mean improved
> > performance or a balanced load on the brokers.
> >
> > Thanks
> > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski
> >  wrote:
> > >
> > > Hey Jason,
> > >
> > > This is certainly a very exciting KIP.
> > > I assume that no changes will be made to the offset commits and they
> will
> > > continue to be sent to the group coordinator?
> > >
> > > I also wanted to address metrics - have we considered any changes
> there?
> > I
> > > imagine that it would be valuable for users to be able to differentiate
> > > between which consumers' partitions are fetched from replicas and which
> > > aren't. I guess that would need to be addressed both in the server's
> > > fetcher lag metrics and in the consumers.
> > >
> > > Thanks,
> > > Stanislav
> > >
> > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao  wrote:
> > >
> > > > Hi, Jason,
> > > >
> > > > Thanks for the KIP. Looks good overall. A few minor comments below.
> > > >
> > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says "Use the
> > > > OffsetForLeaderEpoch API to verify the current position with the
> > leader".
> > > > The OffsetForLeaderEpoch request returns log end offset if the
> request
> > > > leader epoch is the latest. So, we won't know the true high watermark
> > from
> > > > that request. It seems that the consumer still needs to send
> ListOffset
> > > > request to the leader to obtain high watermark?
> > > >
> > > > 2. If a non in-sync replica receives a fetch request from a consumer,
> > > > should it return a new type of error like ReplicaNotInSync?
> > > >
> > > > 3. Could ReplicaSelector be closable?
> > > >
> > > > 4. Currently, the ISR propagation from the leader to the controller
> > can be
> > > > delayed up to 60 secs through
> > ReplicaManager.IsrChangePropagationInterval.
> > > > In that window, the consumer could still be consuming from a non
> > in-sync
> > > > replica. The relatively large delay is mostly for reducing the ZK
> > writes
> > > > and the watcher overhead. Not sure what's the best way to address
> > this. We
> > > > could potentially make this configurable.
> > > >
> > > > 5. It may be worth 

Re: [VOTE] KIP-394: Require member.id for initial join group request

2018-12-10 Thread Harsha
+1 . Thanks for the KIP. This is very much needed.

-Harsha

On Mon, Dec 10, 2018, at 11:00 AM, Guozhang Wang wrote:
> +1. Thanks Boyang!
> 
> 
> Guozhang
> 
> On Mon, Dec 10, 2018 at 10:29 AM Jason Gustafson  wrote:
> 
> > +1 Thanks for the KIP, Boyang!
> >
> > -Jason
> >
> > On Mon, Dec 10, 2018 at 10:07 AM Boyang Chen  wrote:
> >
> > > Thanks for voting my friends. Could someone give one more binding vote
> > > here?
> > >
> > > Best,
> > > Boyang
> > > 
> > > From: Bill Bejeck 
> > > Sent: Thursday, December 6, 2018 2:45 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-394: Require member.id for initial join group
> > > request
> > >
> > > +1
> > > Thanks for the KIP.
> > >
> > > -Bill
> > >
> > > On Wed, Dec 5, 2018 at 1:43 PM Matthias J. Sax 
> > > wrote:
> > >
> > > > Thanks for the KIP.
> > > >
> > > > +1 (binding)
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 12/5/18 7:53 AM, Mayuresh Gharat wrote:
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > >
> > > > > On Wed, Dec 5, 2018 at 3:59 AM Boyang Chen 
> > > wrote:
> > > > >
> > > > >> Hey friends,
> > > > >>
> > > > >> I would like to start a vote for KIP-394<
> > > > >>
> > > >
> > >
> > https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C78066780826b40ed5e7d08d65ae1eda1%7C84df9e7fe9f640afb435%7C1%7C0%7C636796323725452670sdata=Gp%2FVlUuezVVck81fMXH7yaQ7zKd0WaJ9Kc7GhtJW2Qo%3Dreserved=0
> > > > >.
> > > > >> The goal of this KIP is to improve broker stability by fencing
> > invalid
> > > > join
> > > > >> group requests.
> > > > >>
> > > > >> Best,
> > > > >> Boyang
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
> 
> 
> -- 
> -- Guozhang


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-12-10 Thread Jason Gustafson
Hey Stanislav,

Just to clarify, I think what you're suggesting is something like this in
order to gracefully shrink the group:

1. Transition the group to PREPARING_REBALANCE. No members are kicked out.
2. Continue to allow offset commits and heartbeats for all current members.
3. Allow the first n members that send JoinGroup to stay in the group, but
wait for the JoinGroup (or session timeout) from all active members before
finishing the rebalance.

So basically we try to give the current members an opportunity to finish
work, but we prevent some of them from rejoining after the rebalance
completes. It sounds reasonable if I've understood correctly.

Thanks,
Jason



On Fri, Dec 7, 2018 at 6:47 AM Boyang Chen  wrote:

> Yep, LGTM on my side. Thanks Stanislav!
> 
> From: Stanislav Kozlovski 
> Sent: Friday, December 7, 2018 8:51 PM
> To: dev@kafka.apache.org
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Hi,
>
> We discussed this offline with Boyang and figured that it's best to not
> wait on the Cooperative Rebalancing proposal. Our thinking is that we can
> just force a rebalance from the broker, allowing consumers to commit
> offsets if their rebalanceListener is configured correctly.
> When rebalancing improvements are implemented, we assume that they would
> improve KIP-389's behavior as well as the normal rebalance scenarios
>
> On Wed, Dec 5, 2018 at 12:09 PM Boyang Chen  wrote:
>
> > Hey Stanislav,
> >
> > thanks for the question! `Trivial rebalance` means "we don't start
> > reassignment right now, but you need to know it's coming soon
> > and you should start preparation".
> >
> > An example KStream use case is that before actually starting to shrink
> the
> > consumer group, we need to
> > 1. partition the consumer group into two subgroups, where one will be
> > offline soon and the other will keep serving;
> > 2. make sure the states associated with near-future offline consumers are
> > successfully replicated on the serving ones.
> >
> > As I have mentioned shrinking the consumer group is pretty much
> equivalent
> > to group scaling down, so we could think of this
> > as an add-on use case for cluster scaling. So my understanding is that
> the
> > KIP-389 could be sequenced within our cooperative rebalancing<
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FIncremental%2BCooperative%2BRebalancing%253A%2BSupport%2Band%2BPoliciesdata=02%7C01%7C%7C9b6fddd2f2be41ce39c308d65c42c821%7C84df9e7fe9f640afb435%7C1%7C0%7C636797839221710310sdata=N%2BVJsEYYwTx6k0uz8%2BKvL9tt3jLECokyAA%2B2mWyyOyA%3Dreserved=0
> > >
> > proposal.
> >
> > Let me know if this makes sense.
> >
> > Best,
> > Boyang
> > 
> > From: Stanislav Kozlovski 
> > Sent: Wednesday, December 5, 2018 5:52 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > metadata growth
> >
> > Hey Boyang,
> >
> > I think we still need to take care of group shrinkage because even if
> users
> > change the config value we cannot guarantee that all consumer groups
> would
> > have been manually shrunk.
> >
> > Regarding 2., I agree that forcefully triggering a rebalance might be the
> > most intuitive way to handle the situation.
> > What does a "trivial rebalance" mean? Sorry, I'm not familiar with the
> > term.
> > I was thinking that maybe we could force a rebalance, which would cause
> > consumers to commit their offsets (given their rebalanceListener is
> > configured correctly) and subsequently reject some of the incoming
> > `joinGroup` requests. Does that sound like it would work?
> >
> > On Wed, Dec 5, 2018 at 1:13 AM Boyang Chen  wrote:
> >
> > > Hey Stanislav,
> > >
> > > I read the latest KIP and saw that we already changed the default value
> > to
> > > -1. Do
> > > we still need to take care of the consumer group shrinking when doing
> the
> > > upgrade?
> > >
> > > However this is an interesting topic that worth discussing. Although
> > > rolling
> > > upgrade is fine, `consumer.group.max.size` could always have conflict
> > with
> > > the current
> > > consumer group size which means we need to adhere to one source of
> truth.
> > >
> > > 1.Choose the current group size, which means we never interrupt the
> > > consumer group until
> > > it transits to PREPARE_REBALANCE. And we keep track of how many join
> > group
> > > requests
> > > we have seen so far during PREPARE_REBALANCE. After reaching the
> consumer
> > > cap,
> > > we start to inform over provisioned consumers that you should send
> > > LeaveGroupRequest and
> > > fail yourself. Or with what Mayuresh proposed in KIP-345, we could mark
> > > extra members
> > > as hot backup and rebalance without them.
> > >
> > > 2.Choose the `consumer.group.max.size`. I feel incremental rebalancing
> > > (you proposed) could be of help here.
> > > 

Re: Kafka client Metadata update on demand?

2018-12-10 Thread Mayuresh Gharat
Hi Ming,

Kafka clients do update there metadata on NotLeaderForPartitionException.
The metadata update happens asynchronously.

Also if you are getting this exception for a longer time, it might mean
that your client is fetching metadata from a broker whose metadata cache is
not updated with the latest metadata (he broker has not yet processed the
updateMetadataRequest from the controller).

Thanks,

Mayuresh

On Mon, Dec 10, 2018 at 10:39 AM Ming Liu  wrote:

> Hey community,
> It seems Kafka Metadata update only happens at the pre-configured
> Metadata update interval.  During upgrade, when leader changes, the client
> will fail with NotLeaderForPartitionException until next Metadata update
> happened.  I wonder why we don't have the on-demand Metadata update (that
> is, have Metadata update when there is exception like
> NotLeaderForPartitionException)?
>
> Thanks!
> Ming
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [VOTE] KIP-394: Require member.id for initial join group request

2018-12-10 Thread Guozhang Wang
+1. Thanks Boyang!


Guozhang

On Mon, Dec 10, 2018 at 10:29 AM Jason Gustafson  wrote:

> +1 Thanks for the KIP, Boyang!
>
> -Jason
>
> On Mon, Dec 10, 2018 at 10:07 AM Boyang Chen  wrote:
>
> > Thanks for voting my friends. Could someone give one more binding vote
> > here?
> >
> > Best,
> > Boyang
> > 
> > From: Bill Bejeck 
> > Sent: Thursday, December 6, 2018 2:45 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [VOTE] KIP-394: Require member.id for initial join group
> > request
> >
> > +1
> > Thanks for the KIP.
> >
> > -Bill
> >
> > On Wed, Dec 5, 2018 at 1:43 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the KIP.
> > >
> > > +1 (binding)
> > >
> > > -Matthias
> > >
> > >
> > > On 12/5/18 7:53 AM, Mayuresh Gharat wrote:
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > > On Wed, Dec 5, 2018 at 3:59 AM Boyang Chen 
> > wrote:
> > > >
> > > >> Hey friends,
> > > >>
> > > >> I would like to start a vote for KIP-394<
> > > >>
> > >
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C78066780826b40ed5e7d08d65ae1eda1%7C84df9e7fe9f640afb435%7C1%7C0%7C636796323725452670sdata=Gp%2FVlUuezVVck81fMXH7yaQ7zKd0WaJ9Kc7GhtJW2Qo%3Dreserved=0
> > > >.
> > > >> The goal of this KIP is to improve broker stability by fencing
> invalid
> > > join
> > > >> group requests.
> > > >>
> > > >> Best,
> > > >> Boyang
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>


-- 
-- Guozhang


Finding reviewers for some fixes for sequenceId overflow

2018-12-10 Thread Ming Liu
Hi, community,
   Here is the simple fix for two serious problems about sequenceId
overflow (one in client, one at broker). It has been there for a week.
   Can somebody review it?

https://issues.apache.org/jira/browse/KAFKA-7693
https://issues.apache.org/jira/browse/KAFKA-7692

Thanks!
Ming


Kafka client Metadata update on demand?

2018-12-10 Thread Ming Liu
Hey community,
It seems Kafka Metadata update only happens at the pre-configured
Metadata update interval.  During upgrade, when leader changes, the client
will fail with NotLeaderForPartitionException until next Metadata update
happened.  I wonder why we don't have the on-demand Metadata update (that
is, have Metadata update when there is exception like
NotLeaderForPartitionException)?

Thanks!
Ming


Re: [VOTE] KIP-394: Require member.id for initial join group request

2018-12-10 Thread Jason Gustafson
+1 Thanks for the KIP, Boyang!

-Jason

On Mon, Dec 10, 2018 at 10:07 AM Boyang Chen  wrote:

> Thanks for voting my friends. Could someone give one more binding vote
> here?
>
> Best,
> Boyang
> 
> From: Bill Bejeck 
> Sent: Thursday, December 6, 2018 2:45 AM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-394: Require member.id for initial join group
> request
>
> +1
> Thanks for the KIP.
>
> -Bill
>
> On Wed, Dec 5, 2018 at 1:43 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP.
> >
> > +1 (binding)
> >
> > -Matthias
> >
> >
> > On 12/5/18 7:53 AM, Mayuresh Gharat wrote:
> > > +1 (non-binding)
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Wed, Dec 5, 2018 at 3:59 AM Boyang Chen 
> wrote:
> > >
> > >> Hey friends,
> > >>
> > >> I would like to start a vote for KIP-394<
> > >>
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C78066780826b40ed5e7d08d65ae1eda1%7C84df9e7fe9f640afb435%7C1%7C0%7C636796323725452670sdata=Gp%2FVlUuezVVck81fMXH7yaQ7zKd0WaJ9Kc7GhtJW2Qo%3Dreserved=0
> > >.
> > >> The goal of this KIP is to improve broker stability by fencing invalid
> > join
> > >> group requests.
> > >>
> > >> Best,
> > >> Boyang
> > >>
> > >>
> > >
> >
> >
>


Re: [VOTE] KIP-345: Introduce static membership protocol to reduce consumer rebalances

2018-12-10 Thread Boyang Chen
Hey there, could I get more votes on this thread?

Thanks for the vote from Mayuresh and Mike :)

Best,
Boyang

From: Mayuresh Gharat 
Sent: Thursday, December 6, 2018 10:53 AM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to reduce 
consumer rebalances

+1 (non-binding)

Thanks,

Mayuresh

On Tue, Dec 4, 2018 at 6:58 AM Mike Freyberger 
wrote:

> +1 (non binding)
>
> On 12/4/18, 9:43 AM, "Patrick Williams" 
> wrote:
>
> Pls take me off this VOTE list
>
> Best,
>
> Patrick Williams
>
> Sales Manager, UK & Ireland, Nordics & Israel
> StorageOS
> +44 (0)7549 676279
> patrick.willi...@storageos.com
>
> 20 Midtown
> 20 Proctor Street
> Holborn
> London WC1V 6NX
>
> Twitter: @patch37
> LinkedIn: linkedin.com/in/patrickwilliams4 <
> https://nam02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinkedin.com%2Fin%2Fpatrickwilliams4data=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=QIZ7s9HoutiaKs4bAg68oNsUDZ9ertfwlHd%2FRWKRFOg%3Dreserved=0>
>
> 
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslack.storageos.com%2Fdata=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=lthlUwYKvWgxquV%2FJE%2FQF9pFYrMYPV1QK72I1mu8E%2BY%3Dreserved=0
>
>
>
> On 03/12/2018, 17:34, "Guozhang Wang"  wrote:
>
> Hello Boyang,
>
> I've browsed through the new wiki and there are still a couple of
> minor
> things to notice:
>
> 1. RemoveMemberFromGroupOptions seems not defined anywhere.
>
> 2. LeaveGroupRequest added a list of group instance id, but still
> keep the
> member id as a singleton; is that intentional? I think to make the
> protocol
> consistent both member id and instance ids could be plural.
>
> 3. About the *kafka-remove-member-from-group.sh *tool, I'm
> wondering if we
> can defer adding this while just add the corresponding calls of the
> LeaveGroupRequest inside Streams until we have used it in
> production and
> hence have a better understanding on how flexible or extensible if
> we want
> to add any cmd tools. The rationale is that if we do not
> necessarily need
> it now, we can always add it later with a more think-through API
> design,
> but if we add the tool in a rush, we may need to extend or modify
> it soon
> after we realize its limits in operations.
>
> Otherwise, I'm +1 on the proposal.
>
> Guozhang
>
>
> On Mon, Dec 3, 2018 at 9:14 AM Boyang Chen 
> wrote:
>
> > Hey community friends,
> >
> > after another month of polishing, KIP-345<
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-345%253A%2BIntroduce%2Bstatic%2Bmembership%2Bprotocol%2Bto%2Breduce%2Bconsumer%2Brebalancesdata=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=L0X1z8hE%2FebB0KGbUWttz4lvsy%2FkcB49MRc8KZd8I0Y%3Dreserved=0
> >
> > design is ready for vote. Feel free to add your comment on the
> discussion
> > thread or here.
> >
> > Thanks for your time!
> >
> > Boyang
> > 
> > From: Boyang Chen 
> > Sent: Friday, November 9, 2018 6:35 AM
> > To: dev@kafka.apache.org
> > Subject: [VOTE] KIP-345: Introduce static membership protocol to
> reduce
> > consumer rebalances
> >
> > Hey all,
> >
> >
> > thanks so much for all the inputs on KIP-345 so far. The
> original proposal
> > has enhanced a lot with your help. To make sure the
> implementation go
> > smoothly without back and forth, I would like to start a vote on
> the final
> > design agreement now:
> >
> > 
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-data=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=Ld0DpCbOmH0Gmu%2FVfkRS5lWA0vBcgi9WmHDvYz4L3b8%3Dreserved=0<
> >
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-345%253A%2BIntroduce%2Bstatic%2Bmembership%2Bprotocol%2Bto%2Breduce%2Bconsumer%2Brebalancesdata=02%7C01%7C%7Ca9e7b092d1024e90c1d708d65b261e1e%7C84df9e7fe9f640afb435%7C1%7C0%7C636796616598061118sdata=L0X1z8hE%2FebB0KGbUWttz4lvsy%2FkcB49MRc8KZd8I0Y%3Dreserved=0
> > >
> >
> >
> 345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances<
> >
> 

Re: [VOTE] KIP-394: Require member.id for initial join group request

2018-12-10 Thread Boyang Chen
Thanks for voting my friends. Could someone give one more binding vote here?

Best,
Boyang

From: Bill Bejeck 
Sent: Thursday, December 6, 2018 2:45 AM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-394: Require member.id for initial join group request

+1
Thanks for the KIP.

-Bill

On Wed, Dec 5, 2018 at 1:43 PM Matthias J. Sax 
wrote:

> Thanks for the KIP.
>
> +1 (binding)
>
> -Matthias
>
>
> On 12/5/18 7:53 AM, Mayuresh Gharat wrote:
> > +1 (non-binding)
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Wed, Dec 5, 2018 at 3:59 AM Boyang Chen  wrote:
> >
> >> Hey friends,
> >>
> >> I would like to start a vote for KIP-394<
> >>
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C78066780826b40ed5e7d08d65ae1eda1%7C84df9e7fe9f640afb435%7C1%7C0%7C636796323725452670sdata=Gp%2FVlUuezVVck81fMXH7yaQ7zKd0WaJ9Kc7GhtJW2Qo%3Dreserved=0
> >.
> >> The goal of this KIP is to improve broker stability by fencing invalid
> join
> >> group requests.
> >>
> >> Best,
> >> Boyang
> >>
> >>
> >
>
>


[jira] [Resolved] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-12-10 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7549.

Resolution: Fixed

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-12-10 Thread John Roesler
Hi, all,

>> In fact, we
>> can just keep a single final-result store with timestamps and reject
values
>> that have a smaller timestamp, is that right?

> Which is the correct output should at least be decided on the offset of
> the original message.

Thanks for this point, Jan.

KIP-258 is merely to allow embedding the record timestamp  in the k/v
store,
as well as providing a storage-format upgrade path.

I might have missed it, but I think we have yet to discuss whether it's safe
or desirable just to swap topic-ordering our for timestamp-ordering. This is
a very deep topic, and I think it would only pollute the current discussion.

What Adam has proposed is safe, given the *current* ordering semantics
of the system. If we can agree on his proposal, I think we can merge the
feature well before the conversation about timestamp ordering even takes
place, much less reaches a conclusion. In the mean time, it would seem to
be unfortunate to have one join operator with different ordering semantics
from every other KTable operator.

If and when that timestamp discussion takes place, many (all?) KTable
operations
will need to be updated, rendering the many:one join a small marginal cost.

And, just to plug it again, I proposed an algorithm above that I believe
provides
correct ordering without any additional metadata, and regardless of the
ordering semantics. I didn't bring it up further, because I felt the KIP
only needs
to agree on the public API, and we can discuss the implementation at
leisure in
a PR...

Thanks,
-John


On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak 
wrote:

>
>
> On 10.12.2018 07:42, Guozhang Wang wrote:
> > Hello Adam / Jan / John,
> >
> > Sorry for being late on this thread! I've finally got some time this
> > weekend to cleanup a load of tasks on my queue (actually I've also
> realized
> > there are a bunch of other things I need to enqueue while cleaning them
> up
> > --- sth I need to improve on my side). So here are my thoughts:
> >
> > Regarding the APIs: I like the current written API in the KIP. More
> > generally I'd prefer to keep the 1) one-to-many join functionalities as
> > well as 2) other join types than inner as separate KIPs since 1) may
> worth
> > a general API refactoring that can benefit not only foreignkey joins but
> > collocate joins as well (e.g. an extended proposal of
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> ),
> > and I'm not sure if other join types would actually be needed (maybe left
> > join still makes sense), so it's better to wait-for-people-to-ask-and-add
> > than add-sth-that-no-one-uses.
> >
> > Regarding whether we enforce step 3) / 4) v.s. introducing a
> > KScatteredTable for users to inject their own optimization: I'd prefer to
> > do the current option as-is, and my main rationale is for optimization
> > rooms inside the Streams internals and the API succinctness. For advanced
> > users who may indeed prefer KScatteredTable and do their own
> optimization,
> > while it is too much of the work to use Processor API directly, I think
> we
> > can still extend the current API to support it in the future if it
> becomes
> > necessary.
>
> no internal optimization potential. it's a myth
>
> ¯\_(ツ)_/¯
>
> :-)
>
> >
> > Another note about step 4) resolving out-of-ordering data, as I mentioned
> > before I think with KIP-258 (embedded timestamp with key-value store) we
> > can actually make this step simpler than the current proposal. In fact,
> we
> > can just keep a single final-result store with timestamps and reject
> values
> > that have a smaller timestamp, is that right?
>
> Which is the correct output should at least be decided on the offset of
> the original message.
>
> >
> >
> > That's all I have in mind now. Again, great appreciation to Adam to make
> > such HUGE progress on this KIP!
> >
> >
> > Guozhang
> >
> > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak 
> > wrote:
> >
> >> If they don't find the time:
> >> They usually take the opposite path from me :D
> >> so the answer would be clear.
> >>
> >> hence my suggestion to vote.
> >>
> >>
> >> On 04.12.2018 21:06, Adam Bellemare wrote:
> >>> Hi Guozhang and Matthias
> >>>
> >>> I know both of you are quite busy, but we've gotten this KIP to a point
> >>> where we need more guidance on the API (perhaps a bit of a tie-breaker,
> >> if
> >>> you will). If you have anyone else you may think should look at this,
> >>> please tag them accordingly.
> >>>
> >>> The scenario is as such:
> >>>
> >>> Current Option:
> >>> API:
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> >>> 1) Rekey the data to CombinedKey, and shuffles it to the partition with
> >> the
> >>> foreignKey (repartition 1)
> >>> 2) Join the data
> >>> 3) Shuffle the data back to the original node (repartition 2)
> >>> 4) Resolve out-of-order arrival / race condition due to 

Re: [DISCUSS] KIP-398: Support reading trust store from classpath

2018-12-10 Thread Noa Resare
Thank you for your comments, see replies inline.

> On 9 Dec 2018, at 01:33, Harsha  wrote:
> 
> Hi Noa,
>   Based on KIP"s motivation section
> "If we had the ability to load a trust store from the classpath as well as 
> from a file, the trust store could be shipped in a jar that could be declared 
> as a dependency and piggyback on the distribution infrastructure already in 
> place."
> 
> It looks like you are making the assumption that distributing a jar is better 
> than the file. I am not sure why one is better than the other. There are 
> other use-cases where one can make a call local "daemon" over Unix socket to 
> fetch a certificate as well.

It was not my intention to convey that loading the trust store from the 
classpath is inherently better in all cases. The proposed change simply brings 
more choice. That said, I do believe that maven central and the transitive 
dependency features of maven, gradle and ivy makes for a smoother user 
experience in many cases. Validating broker certificates against a organisation 
wide private CA cert has benefits in that it means that the person setting up 
the kafka cluster(s) doesn’t need to bother with purchasing or obtaining 
publicly trusted certs for every broker while still providing strong 
cryptographic validation that a client is connecting to a legitimate endpoint. 
If there was a way to distribute a private trust store that is as easy as 
declaring an additional maven style dependency, I imagine that this would be a 
more appealing proposition than it is today. I would assume that many 
organisations opt to disable strict host checking in certificates to sidestep 
the CA cert distribution problem. I think it would be valuable to make it 
slightly more easy to do the right thing.

> 
> Just supporting a "classpath" option might work for a few users but it's not 
> generic enough to support a wide variety of other infrastructures. My 
> suggestion if the KIP motivation is to make the certificate/truststore 
> available with different mechanisms, Lets make a interface that allow users 
> to roll their own based on their infra and support File as the default 
> mechanism so that we can support existing users.

I agree that this is a fairly small change that doesn’t aim to support all 
possible mechanisms that one might conceive of. I believe that KIP-383: 
Pluggable interface for SSL Factory 

 might be a good vehicle to provide this level of flexibility, but even if that 
proposal is accepted I still think that there is room for this KIP to provide 
an easy to use solution to this problem.

cheers
noa

> -Harsha
> 
> On Sat, Dec 8, 2018, at 7:03 AM, Noa Resare wrote:
>> 
>> 
>>> On 6 Dec 2018, at 20:16, Rajini Sivaram  wrote:
>>> 
>>> Hi Noa,
>>> 
>>> Thanks for the KIP. A few comments/questions:
>>> 
>>> 1. If we support filenames starting with `classpath:` by requiring
>>> `file:`prefix,
>>> then we are presumably not supporting files starting `file:`. Not
>>> necessarily an issue, but we do need to document any restrictions.
>> 
>> I think that it would be trivial to support ‘file:’ as a prefix in a 
>> filesystem path
>> by just asking the user that really want that to add it twice:
>> 
>> The config value "file:file:my_weird_file_name" would map to the 
>> filesystem path "file:my_weird_file_name”
>> 
>> 
>>> 2. On the broker-side, trust stores are dynamically updatable. And we use
>>> file modification time to decide whether trust store needs to be reloaded.
>>> This is less of an issue once we implement
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API,
>>> but at the moment, we are relying on actual files on the file system for
>>> which we can compare modification times.
>> 
>>> 3. On the client-side, trust stores are not currently updatable. And we
>>> don't have an API to make them updatable. By using class path, we preclude
>>> the use of file modification times in future to detect key or trust store
>>> updates for clients. It will be good to get feedback from the community on
>>> whether this is a reasonable longer-term restriction.
>> 
>> Interesting. I think that it is a reasonable graceful degradation to 
>> simply not pick up on changed truststores
>> read from the classpath as long as it is documented, but if we really 
>> want we could save a checksum of
>> the truststore, re-read and compare to determine any changes.
>> 
>>> 4. It will be good to get more feedback from the community on whether
>>> loading trust stores from CLASSPATH is a feature that is likely to be
>>> widely adopted. If not, perhaps
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-383%3A++Pluggable+interface+for+SSL+Factory
>>> will be sufficient to enable custom factories that do load trust store from
>>> the CLASSPATH.
>> 
>> While this generic extension point would make it possible to do 

Re: [DISCUSS] KIP-391: Allow Producing with Offsets for Cluster Replication

2018-12-10 Thread Edoardo Comar
(shameless bump) any additional feedback is welcome ... thanks!


Edoardo Comar  wrote on 27/11/2018 15:35:09:

> From: Edoardo Comar 
> To: dev@kafka.apache.org
> Date: 27/11/2018 15:35
> Subject: Re: [DISCUSS] KIP-391: Allow Producing with Offsets for 
> Cluster Replication
> 
> Hi Jason
> 
> we envisioned the replicator to replicate the __consumer_offsets topic 
too 
> (although without producing-with-offsets to it!).
> 
> As there is no client-side implementation yet using the leader epoch, 
> we could not yet see the impact of writing to the destination cluster 
> __consumer_offsets records with an invalid leader epoch.
> 
> Also, applications might still use external storage mechanism for 
consumer 
> offsets where the leader_epoch is missing.
> 
> Perhaps the replicator could - for the __consumer_offsets topic - just 
> omit the leader_epoch field in the data sent to destination.
> 
> What do you think ?
> 
> 
> Jason Gustafson  wrote on 27/11/2018 00:09:56:
> 
> > Another wrinkle to consider is KIP-320. If you are planning to 
replicate
> > __consumer_offsets directly, then you will have to account for leader 
> epoch
> > information which is stored with the committed offsets. But I cannot 
> think
> > how it would be possible to replicate the leader epoch information in
> > messages even if you can preserve offsets.
> > 
> > -Jason
> > 
> > On Mon, Nov 26, 2018 at 1:16 PM Mayuresh Gharat 
> 
> > wrote:
> > 
> > > Hi Edoardo,
> > >
> > > Thanks a lot for the KIP.
> > >  I have a few questions/suggestions in addition to what Radai has 
> mentioned
> > > above :
> > >
> > >1. Is this meant only for 1:1 replication, for example one Kafka 
> cluster
> > >replicating to other, instead of having multiple Kafka clusters
> > > mirroring
> > >into one Kafka cluster?
> > >2. Are we relying on exactly once produce in the replicator? If 
> not, how
> > >are retries handled in the replicator ?
> > >3. What is the recommended value for inflight requests, here. Is 
it
> > >suppose to be strictly 1, if yes, it would be great to mention 
that 
> in
> > > the
> > >KIP.
> > >4. How is unclean Leader election between source cluster and 
> destination
> > >cluster handled?
> > >5. How are offsets resets in case of the replicator's consumer 
> handled?
> > >6. It would be good to explain the workflow in the KIP, with an
> > >example,  regarding how this KIP will change the replication 
> scenario
> > > and
> > >how it will benefit the consumer apps.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Nov 26, 2018 at 8:08 AM radai  
> wrote:
> > >
> > > > a few questions:
> > > >
> > > > 1. how do you handle possible duplications caused by the "special"
> > > > producer timing-out/retrying? are you explicitely relying on the
> > > > "exactly once" sequencing?
> > > > 2. what about the combination of log compacted topics + replicator
> > > > downtime? by the time the replicator comes back up there might be
> > > > "holes" in the source offsets (some msgs might have been compacted
> > > > out)? how is that recoverable?
> > > > 3. similarly, what if you try and fire up replication on a 
non-empty
> > > > source topic? does the kip allow for offsets starting at some
> > > > arbitrary X > 0 ? or would this have to be designed from the 
start.
> > > >
> > > > and lastly, since this KIP seems to be designed fro active-passive
> > > > failover (there can be no produce traffic except the replicator)
> > > > wouldnt a solution based on seeking to a time offset be more 
> generic?
> > > > your producers could checkpoint the last (say log append) 
timestamp 
> of
> > > > records theyve seen, and when restoring in the remote site seek to
> > > > those timestamps (which will be metadata in their committed 
offsets) 
> -
> > > > assumming replication takes > 0 time you'd need to handle some 
dups,
> > > > but every kafka consumer setup needs to know how to handle those
> > > > anyway.
> > > > On Fri, Nov 23, 2018 at 2:27 AM Edoardo Comar  
> wrote:
> > > > >
> > > > > Hi Stanislav
> > > > >
> > > > > > > The flag is needed to distinguish a batch with a desired 
base
> > > offset
> > > > > of
> > > > > > 0,
> > > > > > from a regular batch for which offsets need to be generated.
> > > > > > If the producer can provide offsets, why not provide a base 
> offset of
> > > > 0?
> > > > >
> > > > > a regular batch (for which offsets are generated by the broker 
on
> > > write)
> > > > > is sent with a base offset of 0.
> > > > > How could you distinguish it from a batch where you *want* the 
> first
> > > > > record to be written at offset 0 (i.e. be the first in the 
> partition
> > > and
> > > > > be rejected if there are records on the log already) ?
> > > > > We wanted to avoid a "deep" inspection (and potentially 
> decompression)
> > > of
> > > > > the records.
> > > > >
> > > > > For the replicator use case, a single produce request where all 
> the
> > > data
> > > > > 

[jira] [Created] (KAFKA-7716) Unprocessed messages when Broker fails

2018-12-10 Thread Finbarr Naughton (JIRA)
Finbarr Naughton created KAFKA-7716:
---

 Summary: Unprocessed messages when Broker fails
 Key: KAFKA-7716
 URL: https://issues.apache.org/jira/browse/KAFKA-7716
 Project: Kafka
  Issue Type: Bug
  Components: core, streams
Affects Versions: 2.0.1, 1.0.0
Reporter: Finbarr Naughton


This occurs when running on Kubernetes on bare metal.

A Streams application with a single topology listening to two input topics A 
and B. A is read as a GlobalKTable, B as a KStream. The topology joins the 
stream to the GKTable and writes an updated message to topic A. The application 
is configured to use exactly_once processing.

There are three worker nodes. Kafka brokers are deployed as a statefulset on 
the three nodes using the helm chart from here 
-[https://github.com/helm/charts/tree/master/incubator/kafka] 

The application has three instances spread across the three nodes.

During a test, topic A is pre-populated with 50k messages over 5 minutes. Then 
50k messages with the same key-set are sent to topic B over 5 minutes. The 
expected behaviour is that Topic A will contain 50k updated messages 
afterwards. While all brokers are available this is the case, even when one of 
the application pods is deleted.

When a broker fails, however, a few expected updated messages fail to appear on 
topic A despite their existence on topic B.

 

More complete description here - 
[https://stackoverflow.com/questions/53557247/some-events-unprocessed-by-kafka-streams-application-on-kubernetes-on-bare-metal]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-12-10 Thread Jan Filipiak


On 10.12.2018 07:42, Guozhang Wang wrote:
> Hello Adam / Jan / John,
>
> Sorry for being late on this thread! I've finally got some time this
> weekend to cleanup a load of tasks on my queue (actually I've also realized
> there are a bunch of other things I need to enqueue while cleaning them up
> --- sth I need to improve on my side). So here are my thoughts:
>
> Regarding the APIs: I like the current written API in the KIP. More
> generally I'd prefer to keep the 1) one-to-many join functionalities as
> well as 2) other join types than inner as separate KIPs since 1) may worth
> a general API refactoring that can benefit not only foreignkey joins but
> collocate joins as well (e.g. an extended proposal of
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup),
> and I'm not sure if other join types would actually be needed (maybe left
> join still makes sense), so it's better to wait-for-people-to-ask-and-add
> than add-sth-that-no-one-uses.
>
> Regarding whether we enforce step 3) / 4) v.s. introducing a
> KScatteredTable for users to inject their own optimization: I'd prefer to
> do the current option as-is, and my main rationale is for optimization
> rooms inside the Streams internals and the API succinctness. For advanced
> users who may indeed prefer KScatteredTable and do their own optimization,
> while it is too much of the work to use Processor API directly, I think we
> can still extend the current API to support it in the future if it becomes
> necessary.

no internal optimization potential. it's a myth

¯\_(ツ)_/¯

:-)

>
> Another note about step 4) resolving out-of-ordering data, as I mentioned
> before I think with KIP-258 (embedded timestamp with key-value store) we
> can actually make this step simpler than the current proposal. In fact, we
> can just keep a single final-result store with timestamps and reject values
> that have a smaller timestamp, is that right?

Which is the correct output should at least be decided on the offset of 
the original message.

>
>
> That's all I have in mind now. Again, great appreciation to Adam to make
> such HUGE progress on this KIP!
>
>
> Guozhang
>
> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak 
> wrote:
>
>> If they don't find the time:
>> They usually take the opposite path from me :D
>> so the answer would be clear.
>>
>> hence my suggestion to vote.
>>
>>
>> On 04.12.2018 21:06, Adam Bellemare wrote:
>>> Hi Guozhang and Matthias
>>>
>>> I know both of you are quite busy, but we've gotten this KIP to a point
>>> where we need more guidance on the API (perhaps a bit of a tie-breaker,
>> if
>>> you will). If you have anyone else you may think should look at this,
>>> please tag them accordingly.
>>>
>>> The scenario is as such:
>>>
>>> Current Option:
>>> API:
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
>>> 1) Rekey the data to CombinedKey, and shuffles it to the partition with
>> the
>>> foreignKey (repartition 1)
>>> 2) Join the data
>>> 3) Shuffle the data back to the original node (repartition 2)
>>> 4) Resolve out-of-order arrival / race condition due to foreign-key
>> changes.
>>>
>>> Alternate Option:
>>> Perform #1 and #2 above, and return a KScatteredTable.
>>> - It would be keyed on a wrapped key function: , VR>
>> (KO
>>> = Other Table Key, K = This Table Key, VR = Joined Result)
>>> - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user
>>> would be able to perform additional functions directly from the
>>> KScatteredTable (TBD - currently out of scope).
>>> - John's analysis 2-emails up is accurate as to the tradeoffs.
>>>
>>> Current Option is coded as-is. Alternate option is possible, but will
>>> require for implementation details to be made in the API and some
>> exposure
>>> of new data structures into the API (ie: CombinedKey).
>>>
>>> I appreciate any insight into this.
>>>
>>> Thanks.
>>>
>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare 
>>> wrote:
>>>
 Hi John

 Thanks for your feedback and assistance. I think your summary is
>> accurate
 from my perspective. Additionally, I would like to add that there is a
>> risk
 of inconsistent final states without performing the resolution. This is
>> a
 major concern for me as most of the data I have dealt with is produced
>> by
 relational databases. We have seen a number of cases where a user in the
 Rails UI has modified the field (foreign key), realized they made a
 mistake, and then updated the field again with a new key. The events are
 propagated out as they are produced, and as such we have had real-world
 cases where these inconsistencies were propagated downstream as the
>> final
 values due to the race conditions in the fanout of the data.

 This solution that I propose values correctness of the final result over
 other factors.

 We could always move