Re: [DISCUSS] KIP-454: Expansion of the ConnectClusterState interface

2019-04-25 Thread Magesh Nandakumar
Thanks a lot, Chris. The KIP looks good to me.

On Thu, Apr 25, 2019 at 7:35 PM Chris Egerton  wrote:

> Hi Magesh,
>
> Sounds good; I've updated the KIP to make ConnectClusterDetails an
> interface. If we want to leave the door open to expand it in the future it
> definitely makes sense to treat it similarly to how we're treating the
> ConnectClusterState interface now.
>
> Thanks,
>
> Chris
>
> On Thu, Apr 25, 2019 at 4:18 PM Magesh Nandakumar 
> wrote:
>
> > HI Chrise,
> >
> > Overall it looks good to me. Just one last comment - I was wondering if
> > ConnectClusterDetail should be an interface just like
> ConnectClusterState.
> >
> > Thanks,
> > Magesh
> >
> > On Thu, Apr 25, 2019 at 3:54 PM Chris Egerton 
> wrote:
> >
> > > Hi Magesh,
> > >
> > > Expanding the type we use to convey cluster metadata from just a Kafka
> > > cluster ID string to its own class seems like a good idea for the sake
> of
> > > forwards compatibility, but I'm still not sure what the gains of
> > including
> > > the cluster group ID would be--it's a simple map lookup away in the
> REST
> > > extension's configure(...) method. Including information on whether the
> > > cluster is distributed or standalone definitely seems convenient; as
> far
> > as
> > > I can tell there's no easy way to do that from within a REST extension
> at
> > > the moment, and relying on something like the presence of a group.id
> > > property to identify a distributed cluster could result in false
> > positives.
> > > However, is there a use case for it? If not, we can note that as a
> > possible
> > > addition to the ConnectClusterDetails class for later but leave it out
> > for
> > > now.
> > >
> > > I've updated the KIP to include the new ConnectClusterDetails class but
> > > left out the cluster type information for now; let me know what you
> > think.
> > >
> > > Thanks again for your thoughts!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Apr 24, 2019 at 4:49 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Instead of calling it ConnectClusterId, perhaps call it
> > > > ConnectClusterDetails which can include things like groupid,
> underlying
> > > > kafkaclusterId, standalone or distributed, etc. This will help expose
> > any
> > > > cluster related information in the future.
> > > > Let me know if that would work?
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > > On Wed, Apr 24, 2019 at 4:26 PM Chris Egerton 
> > > wrote:
> > > >
> > > > > Hi Magesh,
> > > > >
> > > > > 1. After ruminating for a little while on the inclusion of a method
> > to
> > > > > retrieve task configurations I've tentatively decided to remove it
> > from
> > > > the
> > > > > proposal and place it in the rejected alternatives section. If
> anyone
> > > > > presents a reasonable use case for it I'll be happy to discuss
> > further
> > > > but
> > > > > right now I think this is the way to go. Thanks for your
> suggestion!
> > > > >
> > > > > 2. The idea of a Connect cluster ID method is certainly
> fascinating,
> > > but
> > > > > there are a few questions it raises. First off, what would the
> > > group.id
> > > > be
> > > > > for a standalone cluster? Second, why return a formatted string
> there
> > > > > instead of a new class such as a ConnectClusterId that provides the
> > two
> > > > in
> > > > > separate methods? And lastly, since REST extensions are configured
> > with
> > > > all
> > > > > of the properties available to the worker, wouldn't it be possible
> to
> > > > just
> > > > > get the group ID of the Connect cluster from there? The reason I'd
> > like
> > > > to
> > > > > see the Kafka cluster ID made available to REST extensions is that
> > > > > retrieving it isn't as simple as reading a configuration from a
> > > > properties
> > > > > map and instead involves creating an admin client from those
> > properties
> > > > and
> > > > > using it to perform a `describe cluster` call, which comes with its
> > own
> > > > > pitfalls as far as error handling, interruptions, and timeouts go.
> > > Since
> > > > > this information is available to the herder already, it seems like
> a
> > > good
> > > > > tradeoff to expose that information to REST extensions so that
> > > developers
> > > > > don't have to duplicate that logic themselves. I'm unsure that the
> > same
> > > > > arguments would apply to exposing a group.id to REST extensions
> > > through
> > > > > the
> > > > > ConnectClusterInterface. What do you think?
> > > > >
> > > > > Thanks again for your thoughts!
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Tue, Apr 23, 2019 at 4:18 PM Magesh Nandakumar <
> > > mage...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Chris,
> > > > > >
> > > > > > I certainly would love to hear others thoughts on #1 but IMO, it
> > > might
> > > > > not
> > > > > > be as useful as ConnectorConfigs and as you mentioned, we could
> > > always
> > > > > add
> > > > > > it 

Re: [DISCUSS] KIP-440: Extend Connect Converter to support headers

2019-04-25 Thread sapiensy
Hi Randall, Konstantine,

I've updated the KIP to reflect the details we discussed here. Let me know if 
it looks good and we can go to the voting phase.

Thanks!

On 2019/04/22 21:07:31, Randall Hauch  wrote: 
> I think it would be helpful to clarify this in the KIP, just so that
> readers are aware that the headers will be the raw header bytes that are
> the same as what is in the Kafka record.
> 
> The alternative I was referring to is exposing the Connect `Headers`
> interface, which is different.
> 
> On Mon, Apr 22, 2019 at 1:45 PM sapie...@gmail.com 
> wrote:
> 
> > Hi Konstantine, Randall,
> >
> > As you can see in the updated Converter interface, it always operates on
> > `org.apache.kafka.common.header.Headers`.
> >
> > WorkerSinkTask simply uses Kafka message headers and passes them to the
> > `toConnectData` method.
> >
> > WorkerSourceTask leverages header converter to extract RecordHeaders,
> > which implements Headers interface. Then RecordHeaders are passed to the
> > `fromConnectData` method.
> >
> > So header converter is used as a way to get headers when converting data
> > from internal Connect format to Kafka messages (cause there is no other way
> > to get the headers in this case).
> >
> > I can add this to the KIP if it's helpful.
> >
> > Randall, what is the alternative approach you're referring to?
> >
> > On 2019/04/22 18:09:24, Randall Hauch  wrote:
> > > Konstantine raises a good point. Which `Headers` is being referenced in
> > the
> > > API? The Connect `org.apache.kafka.connect.header.Headers` would
> > correspond
> > > to what was already deserialized by the `HeaderConverter` or what will
> > yet
> > > be serialized by the `HeaderConverter`. Alternatively, the common `
> > > org.apache.kafka.common.header.Headers` would correspond to the raw
> > header
> > > pairs from the underlying Kafka record.
> > >
> > > So, we probably want to be a bit more specific, and also mention why. And
> > > we probably want to mention the other approach in the rejected
> > alternatives.
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Mon, Apr 22, 2019 at 11:59 AM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Thanks for the KIP Yaroslav!
> > > >
> > > > Apologies for the late comment. However, after reading the KIP it's
> > still
> > > > not very clear to me what happens to the existing
> > > > HeaderConverter interface and what's the expectation for existing code
> > > > implementing this interface out there.
> > > >
> > > > Looking at the PR I see that the existing code is leaving the existing
> > > > connect headers conversion unaffected. I'd expect by reading the KIP to
> > > > understand what's the interplay of the current proposal with the
> > existing
> > > > implementation of KIP-145 that introduced headers in Connect.
> > > >
> > > > Thanks,
> > > > Konstantine
> > > >
> > > > On Mon, Apr 22, 2019 at 9:07 AM Randall Hauch 
> > wrote:
> > > >
> > > > > Thanks for updating the KIP. It looks good to me, and since there
> > haven't
> > > > > been any other issue mentioned in this month-long thread, it's
> > probably
> > > > > fine to start a vote.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Randall
> > > > >
> > > > > On Tue, Apr 2, 2019 at 3:12 PM Randall Hauch 
> > wrote:
> > > > >
> > > > > > Thanks for the submission, Yaroslav -- and for building on the
> > > > suggestion
> > > > > > of Jeremy C in https://issues.apache.org/jira/browse/KAFKA-7273.
> > This
> > > > is
> > > > > > a nice and simple approach that is backward compatible.
> > > > > >
> > > > > > The KIP looks good so far, but I do have two specific suggestions
> > to
> > > > make
> > > > > > things just a bit more explicit. First, both the "Public API" and
> > > > > "Proposed
> > > > > > Changes" sections could be more explicit that the methods in the
> > > > proposal
> > > > > > are being added; as it's currently written a reader must infer
> > that.
> > > > > > Second, the "Proposed Changes" section needs to more clearly
> > specify
> > > > that
> > > > > > the WorkerSourceTask will now use the new fromConnectData method
> > with
> > > > the
> > > > > > headers instead of the existing method, and that the WorkerSinkTask
> > > > will
> > > > > > now use the toConnectData method with the headers instead of the
> > > > existing
> > > > > > method.
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Randall
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 11, 2019 at 11:01 PM Yaroslav Tkachenko <
> > > > sapie...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hello,
> > > > > >>
> > > > > >> I'd like to propose a KIP that extends Kafka Connect Converter
> > > > > interface:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-440%3A+Extend+Connect+Converter+to+support+headers
> > > > > >>
> > > > > >> Thanks for considering!
> > > > > >>
> > > > > >> --
> > > > > >> Yaroslav Tkachenko
> > > > > >> 

Re:Re: [VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-04-25 Thread Wanwan Sun
Hi All,
This KIP passes with three +1 (binding) votes. Thanks to Manikumar for 
guiding me through my first KIP!


Cheers,
Fangbin

At 2019-04-25 21:28:16, "Dongjin Lee"  wrote:
>+1 (non-binding).
>
>+2 (binding), +1 (non-binding) until now.
>
>Thanks,
>Dongjin
>
>On Thu, Apr 25, 2019 at 6:16 AM Gwen Shapira  wrote:
>
>> +1 (binding)
>>
>> On Fri, Feb 1, 2019, 8:20 AM Harsha Chintalapani  wrote:
>>
>> > +1 (binding).
>> >
>> > Thanks,
>> > Harsha
>> > On Jan 31, 2019, 8:08 PM -0800, Manikumar ,
>> > wrote:
>> > > Hi,
>> > >
>> > > +1 (binding). Thanks for the KIP.
>> > >
>> > > On Mon, Jan 28, 2019 at 5:37 PM Fangbin Sun 
>> > wrote:
>> > >
>> > > > Hi, All:
>> > > > I would like to start a vote on KIP-417 which aims at supporting
>> > JmxTool
>> > > > to connect to a secured RMI port.
>> > > >
>> > > >
>> > > > The KIP:
>> > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port
>> > > >
>> > > >
>> > > > Thanks!
>> > > > Fangbin
>> >
>>
>
>
>-- 
>*Dongjin Lee*
>
>*A hitchhiker in the mathematical world.*
>*github:  github.com/dongjinleekr
>linkedin: kr.linkedin.com/in/dongjinleekr
>speakerdeck: speakerdeck.com/dongjin
>*


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2019-04-25 Thread Boyang Chen
Hey all,

there is a minor change to the stream side logic for static 
membership.
 Originally we chose to piggy-back on user to supply a unique `client.id` 
config that we could use to construct per thread level consumer 
`group.instance.id`. This approach has several drawbacks:

  1.  We already have functionalities relying on `client.id`, and it is not 
always the case where user wants to configure it differently for individual 
instances. For example, currently user could throttle requests under same 
client.id, which is a solid use case where the `client.id` should duplicate.
  2.  Existing stream users may unconsciously trigger static membership if they 
already set `client.id` in their Stream apps. This includes unexpected fatal 
errors due to `group.instance.id` fencing we are going to introduce.

In conclusion, it is not good practice to overload existing config that users 
rely on unless there is no side effect. To make more fault tolerant upgrade 
path, we decide to let stream users choose to set `group.instance.id` if they 
want to enable static membership.

Thank you Guozhang and Matthias for the great discussions and enhancements for 
the KIP!

Best,
Boyang



From: Boyang Chen 
Sent: Saturday, March 9, 2019 2:28 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Mike,

Yes that's the plan!


From: Mike Freyberger 
Sent: Saturday, March 9, 2019 10:04 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang,

Is this work targeted for Kafka 2.3? I am eager to use this new feature.

Thanks,

Mike Freyberger

On 12/21/18, 1:21 PM, "Mayuresh Gharat"  wrote:

Hi Boyang,

Regarding "However, we shall still attempt to remove the member static info
if the given `member.id` points to an existing `group.instance.id` upon
LeaveGroupRequest, because I could think of the possibility that in long
term we could want to add static membership leave group logic for more
fine-grained use cases."

> I think, there is some confusion here. I am probably not putting it
> right.
>
I agree, If a static member sends LeaveGroupRequest, it should be removed
> from the group.
>
Now getting back to downgrade of static membership to Dynamic membership,
> with the example described earlier  (copying it again for ease of reading)
> :
>

>>1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
>>2. The group.instance.id for each of there are as follows :
>>   - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
>>3. The mapping on the GroupCordinator would be :
>>   - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
>>   mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
>>   respectively, by the GroupCoordinator.
>>4. Now we do a restart to move the group to dynamic membership.
>>5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
>>don't persist the previously assigned memberId mc1 anywhere on the 
c1).
>>
> - We agree that there is no way to recognize that c1 was a part of the
> group, *earlier*.  If yes, the statement : "The dynamic member rejoins
> the group without `group.instance.id`. It will be accepted since it is a
> known member." is not necessarily true, right?
>


> - Now I *agree* with "However, we shall still attempt to remove the
> member static info if the given `member.id` points to an existing `
> group.instance.id` upon LeaveGroupRequest, because I could think of the
> possibility that in long term we could want to add static membership leave
> group logic for more fine-grained use cases."
>
But that would only happen if the GroupCoordinator allocates the same
> member.id (mc1) to the consumer c1, when it rejoins the group in step 5
> above as a dynamic member, which is very rare as it is randomly generated,
> but possible.
>


> - This raises another question, if the GroupCoordinator assigns a
> member.id (mc1~) to consumer c1 after step 5. It will join the group and
> rebalance and the group will become stable, eventually. Now the
> GroupCoordinator still maintains a mapping of  "group.instance.id ->
> member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
> after some time, it realizes that it has not received heartbeat from the
> consumer with "group.instance.id" = gc1. In that case, it will trigger
> another rebalance assuming that a static member has left the group (when
> actually it (c1) has not left the group but moved to dynamic membership).
> 

Re: [DISCUSS] KIP-454: Expansion of the ConnectClusterState interface

2019-04-25 Thread Chris Egerton
Hi Magesh,

Sounds good; I've updated the KIP to make ConnectClusterDetails an
interface. If we want to leave the door open to expand it in the future it
definitely makes sense to treat it similarly to how we're treating the
ConnectClusterState interface now.

Thanks,

Chris

On Thu, Apr 25, 2019 at 4:18 PM Magesh Nandakumar 
wrote:

> HI Chrise,
>
> Overall it looks good to me. Just one last comment - I was wondering if
> ConnectClusterDetail should be an interface just like ConnectClusterState.
>
> Thanks,
> Magesh
>
> On Thu, Apr 25, 2019 at 3:54 PM Chris Egerton  wrote:
>
> > Hi Magesh,
> >
> > Expanding the type we use to convey cluster metadata from just a Kafka
> > cluster ID string to its own class seems like a good idea for the sake of
> > forwards compatibility, but I'm still not sure what the gains of
> including
> > the cluster group ID would be--it's a simple map lookup away in the REST
> > extension's configure(...) method. Including information on whether the
> > cluster is distributed or standalone definitely seems convenient; as far
> as
> > I can tell there's no easy way to do that from within a REST extension at
> > the moment, and relying on something like the presence of a group.id
> > property to identify a distributed cluster could result in false
> positives.
> > However, is there a use case for it? If not, we can note that as a
> possible
> > addition to the ConnectClusterDetails class for later but leave it out
> for
> > now.
> >
> > I've updated the KIP to include the new ConnectClusterDetails class but
> > left out the cluster type information for now; let me know what you
> think.
> >
> > Thanks again for your thoughts!
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, Apr 24, 2019 at 4:49 PM Magesh Nandakumar 
> > wrote:
> >
> > > Hi Chris,
> > >
> > > Instead of calling it ConnectClusterId, perhaps call it
> > > ConnectClusterDetails which can include things like groupid, underlying
> > > kafkaclusterId, standalone or distributed, etc. This will help expose
> any
> > > cluster related information in the future.
> > > Let me know if that would work?
> > >
> > > Thanks,
> > > Magesh
> > >
> > > On Wed, Apr 24, 2019 at 4:26 PM Chris Egerton 
> > wrote:
> > >
> > > > Hi Magesh,
> > > >
> > > > 1. After ruminating for a little while on the inclusion of a method
> to
> > > > retrieve task configurations I've tentatively decided to remove it
> from
> > > the
> > > > proposal and place it in the rejected alternatives section. If anyone
> > > > presents a reasonable use case for it I'll be happy to discuss
> further
> > > but
> > > > right now I think this is the way to go. Thanks for your suggestion!
> > > >
> > > > 2. The idea of a Connect cluster ID method is certainly fascinating,
> > but
> > > > there are a few questions it raises. First off, what would the
> > group.id
> > > be
> > > > for a standalone cluster? Second, why return a formatted string there
> > > > instead of a new class such as a ConnectClusterId that provides the
> two
> > > in
> > > > separate methods? And lastly, since REST extensions are configured
> with
> > > all
> > > > of the properties available to the worker, wouldn't it be possible to
> > > just
> > > > get the group ID of the Connect cluster from there? The reason I'd
> like
> > > to
> > > > see the Kafka cluster ID made available to REST extensions is that
> > > > retrieving it isn't as simple as reading a configuration from a
> > > properties
> > > > map and instead involves creating an admin client from those
> properties
> > > and
> > > > using it to perform a `describe cluster` call, which comes with its
> own
> > > > pitfalls as far as error handling, interruptions, and timeouts go.
> > Since
> > > > this information is available to the herder already, it seems like a
> > good
> > > > tradeoff to expose that information to REST extensions so that
> > developers
> > > > don't have to duplicate that logic themselves. I'm unsure that the
> same
> > > > arguments would apply to exposing a group.id to REST extensions
> > through
> > > > the
> > > > ConnectClusterInterface. What do you think?
> > > >
> > > > Thanks again for your thoughts!
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Tue, Apr 23, 2019 at 4:18 PM Magesh Nandakumar <
> > mage...@confluent.io>
> > > > wrote:
> > > >
> > > > > Chris,
> > > > >
> > > > > I certainly would love to hear others thoughts on #1 but IMO, it
> > might
> > > > not
> > > > > be as useful as ConnectorConfigs and as you mentioned, we could
> > always
> > > > add
> > > > > it when the need arises.
> > > > > Thanks for clarifying the details on my concern #2 regarding the
> > > > > kafkaClusterId. While not a perfect fit in the interface, I'm not
> > > > > completely opposed to having it in the interface. The other
> option, I
> > > can
> > > > > think is to expose a connectClusterId() returning group.id +
> > > > > kafkaClusterId
> > > > > (with some delimiter) rather than returning the kafkaClusterId. 

Jenkins build is back to normal : kafka-trunk-jdk11 #466

2019-04-25 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-463: Auto-configure serdes passed alongside TopologyBuilder

2019-04-25 Thread Richard Yu
Hi all,

Due to issues that was discovered during the first attempt to implement a
solution for the KAFKA-3729 (
https://issues.apache.org/jira/browse/KAFKA-3729),
a KIP was thought to be necessary. There are a couple of alternatives by
which we can proceed, so it would be good if we could discern the pros and
cons of each approach.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-463%3A+Auto-configure+non-default+Serdes+passed+alongside+the+TopologyBuilder

Hope this helps,
Richard Yu


[jira] [Resolved] (KAFKA-8287) JVM global map to fence duplicate client id

2019-04-25 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8287.

Resolution: Invalid

We don't want to do it for now because there are existing use cases where 
`client.id` is expected to be duplicate across different stream instances for 
request throttling purpose.

> JVM global map to fence duplicate client id
> ---
>
> Key: KAFKA-8287
> URL: https://issues.apache.org/jira/browse/KAFKA-8287
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> After change in https://issues.apache.org/jira/browse/KAFKA-8285, the two 
> stream instances scheduled on same JVM will be mutually affected if they 
> accidentally assign same client.id, since the thread-id becomes local now. 
> The solution is to build a global concurrent map for solving conflict if two 
> threads happen to be having the same client.id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk11 #465

2019-04-25 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-8291 : System test fix (#6637)

--
[...truncated 2.38 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [DISCUSS] KIP-454: Expansion of the ConnectClusterState interface

2019-04-25 Thread Magesh Nandakumar
HI Chrise,

Overall it looks good to me. Just one last comment - I was wondering if
ConnectClusterDetail should be an interface just like ConnectClusterState.

Thanks,
Magesh

On Thu, Apr 25, 2019 at 3:54 PM Chris Egerton  wrote:

> Hi Magesh,
>
> Expanding the type we use to convey cluster metadata from just a Kafka
> cluster ID string to its own class seems like a good idea for the sake of
> forwards compatibility, but I'm still not sure what the gains of including
> the cluster group ID would be--it's a simple map lookup away in the REST
> extension's configure(...) method. Including information on whether the
> cluster is distributed or standalone definitely seems convenient; as far as
> I can tell there's no easy way to do that from within a REST extension at
> the moment, and relying on something like the presence of a group.id
> property to identify a distributed cluster could result in false positives.
> However, is there a use case for it? If not, we can note that as a possible
> addition to the ConnectClusterDetails class for later but leave it out for
> now.
>
> I've updated the KIP to include the new ConnectClusterDetails class but
> left out the cluster type information for now; let me know what you think.
>
> Thanks again for your thoughts!
>
> Cheers,
>
> Chris
>
> On Wed, Apr 24, 2019 at 4:49 PM Magesh Nandakumar 
> wrote:
>
> > Hi Chris,
> >
> > Instead of calling it ConnectClusterId, perhaps call it
> > ConnectClusterDetails which can include things like groupid, underlying
> > kafkaclusterId, standalone or distributed, etc. This will help expose any
> > cluster related information in the future.
> > Let me know if that would work?
> >
> > Thanks,
> > Magesh
> >
> > On Wed, Apr 24, 2019 at 4:26 PM Chris Egerton 
> wrote:
> >
> > > Hi Magesh,
> > >
> > > 1. After ruminating for a little while on the inclusion of a method to
> > > retrieve task configurations I've tentatively decided to remove it from
> > the
> > > proposal and place it in the rejected alternatives section. If anyone
> > > presents a reasonable use case for it I'll be happy to discuss further
> > but
> > > right now I think this is the way to go. Thanks for your suggestion!
> > >
> > > 2. The idea of a Connect cluster ID method is certainly fascinating,
> but
> > > there are a few questions it raises. First off, what would the
> group.id
> > be
> > > for a standalone cluster? Second, why return a formatted string there
> > > instead of a new class such as a ConnectClusterId that provides the two
> > in
> > > separate methods? And lastly, since REST extensions are configured with
> > all
> > > of the properties available to the worker, wouldn't it be possible to
> > just
> > > get the group ID of the Connect cluster from there? The reason I'd like
> > to
> > > see the Kafka cluster ID made available to REST extensions is that
> > > retrieving it isn't as simple as reading a configuration from a
> > properties
> > > map and instead involves creating an admin client from those properties
> > and
> > > using it to perform a `describe cluster` call, which comes with its own
> > > pitfalls as far as error handling, interruptions, and timeouts go.
> Since
> > > this information is available to the herder already, it seems like a
> good
> > > tradeoff to expose that information to REST extensions so that
> developers
> > > don't have to duplicate that logic themselves. I'm unsure that the same
> > > arguments would apply to exposing a group.id to REST extensions
> through
> > > the
> > > ConnectClusterInterface. What do you think?
> > >
> > > Thanks again for your thoughts!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Tue, Apr 23, 2019 at 4:18 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Chris,
> > > >
> > > > I certainly would love to hear others thoughts on #1 but IMO, it
> might
> > > not
> > > > be as useful as ConnectorConfigs and as you mentioned, we could
> always
> > > add
> > > > it when the need arises.
> > > > Thanks for clarifying the details on my concern #2 regarding the
> > > > kafkaClusterId. While not a perfect fit in the interface, I'm not
> > > > completely opposed to having it in the interface. The other option, I
> > can
> > > > think is to expose a connectClusterId() returning group.id +
> > > > kafkaClusterId
> > > > (with some delimiter) rather than returning the kafkaClusterId. If we
> > > > choose to go this route, we can even make this a first-class citizen
> of
> > > the
> > > > Herder interface. Let me know what you think.
> > > >
> > > > Thanks
> > > > Magesh
> > > >
> > > > On Thu, Apr 18, 2019 at 2:45 PM Chris Egerton 
> > > wrote:
> > > >
> > > > > Hi Magesh,
> > > > >
> > > > > Thanks for your comments. I'll address them in the order you
> provided
> > > > them:
> > > > >
> > > > > 1 - Reason for exposing task configurations to REST extensions:
> > > > > Yes, the motivation is a little thin for exposing task configs to
> > REST
> > > > > extensions. I can think 

Jenkins build is back to normal : kafka-2.1-jdk8 #173

2019-04-25 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-454: Expansion of the ConnectClusterState interface

2019-04-25 Thread Chris Egerton
Hi Magesh,

Expanding the type we use to convey cluster metadata from just a Kafka
cluster ID string to its own class seems like a good idea for the sake of
forwards compatibility, but I'm still not sure what the gains of including
the cluster group ID would be--it's a simple map lookup away in the REST
extension's configure(...) method. Including information on whether the
cluster is distributed or standalone definitely seems convenient; as far as
I can tell there's no easy way to do that from within a REST extension at
the moment, and relying on something like the presence of a group.id
property to identify a distributed cluster could result in false positives.
However, is there a use case for it? If not, we can note that as a possible
addition to the ConnectClusterDetails class for later but leave it out for
now.

I've updated the KIP to include the new ConnectClusterDetails class but
left out the cluster type information for now; let me know what you think.

Thanks again for your thoughts!

Cheers,

Chris

On Wed, Apr 24, 2019 at 4:49 PM Magesh Nandakumar 
wrote:

> Hi Chris,
>
> Instead of calling it ConnectClusterId, perhaps call it
> ConnectClusterDetails which can include things like groupid, underlying
> kafkaclusterId, standalone or distributed, etc. This will help expose any
> cluster related information in the future.
> Let me know if that would work?
>
> Thanks,
> Magesh
>
> On Wed, Apr 24, 2019 at 4:26 PM Chris Egerton  wrote:
>
> > Hi Magesh,
> >
> > 1. After ruminating for a little while on the inclusion of a method to
> > retrieve task configurations I've tentatively decided to remove it from
> the
> > proposal and place it in the rejected alternatives section. If anyone
> > presents a reasonable use case for it I'll be happy to discuss further
> but
> > right now I think this is the way to go. Thanks for your suggestion!
> >
> > 2. The idea of a Connect cluster ID method is certainly fascinating, but
> > there are a few questions it raises. First off, what would the group.id
> be
> > for a standalone cluster? Second, why return a formatted string there
> > instead of a new class such as a ConnectClusterId that provides the two
> in
> > separate methods? And lastly, since REST extensions are configured with
> all
> > of the properties available to the worker, wouldn't it be possible to
> just
> > get the group ID of the Connect cluster from there? The reason I'd like
> to
> > see the Kafka cluster ID made available to REST extensions is that
> > retrieving it isn't as simple as reading a configuration from a
> properties
> > map and instead involves creating an admin client from those properties
> and
> > using it to perform a `describe cluster` call, which comes with its own
> > pitfalls as far as error handling, interruptions, and timeouts go. Since
> > this information is available to the herder already, it seems like a good
> > tradeoff to expose that information to REST extensions so that developers
> > don't have to duplicate that logic themselves. I'm unsure that the same
> > arguments would apply to exposing a group.id to REST extensions through
> > the
> > ConnectClusterInterface. What do you think?
> >
> > Thanks again for your thoughts!
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Apr 23, 2019 at 4:18 PM Magesh Nandakumar 
> > wrote:
> >
> > > Chris,
> > >
> > > I certainly would love to hear others thoughts on #1 but IMO, it might
> > not
> > > be as useful as ConnectorConfigs and as you mentioned, we could always
> > add
> > > it when the need arises.
> > > Thanks for clarifying the details on my concern #2 regarding the
> > > kafkaClusterId. While not a perfect fit in the interface, I'm not
> > > completely opposed to having it in the interface. The other option, I
> can
> > > think is to expose a connectClusterId() returning group.id +
> > > kafkaClusterId
> > > (with some delimiter) rather than returning the kafkaClusterId. If we
> > > choose to go this route, we can even make this a first-class citizen of
> > the
> > > Herder interface. Let me know what you think.
> > >
> > > Thanks
> > > Magesh
> > >
> > > On Thu, Apr 18, 2019 at 2:45 PM Chris Egerton 
> > wrote:
> > >
> > > > Hi Magesh,
> > > >
> > > > Thanks for your comments. I'll address them in the order you provided
> > > them:
> > > >
> > > > 1 - Reason for exposing task configurations to REST extensions:
> > > > Yes, the motivation is a little thin for exposing task configs to
> REST
> > > > extensions. I can think of a few uses for this functionality, such as
> > > > attempting to infer problematic configurations by examining failed
> > tasks
> > > > and comparing their configurations to the configurations of running
> > > tasks,
> > > > but like you've indicated it's dubious that the best place for
> anything
> > > > like that belongs in a REST extension.
> > > > I'd be interested to hear others' thoughts, but right now I'm not too
> > > > opposed to erring on the side of caution and leaving it out. Worst
> > 

Build failed in Jenkins: kafka-2.2-jdk8 #93

2019-04-25 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Do not log retriable offset commit exceptions as errors (#5904)

--
[...truncated 2.70 MB...]

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
PASSED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
STARTED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion STARTED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup 
STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup 
PASSED

kafka.controller.PartitionStateMachineTest > 
testNoOfflinePartitionsChangeForTopicsBeingDeleted STARTED

kafka.controller.PartitionStateMachineTest > 

Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-25 Thread Jose Armando Garcia Sancio
On Thu, Apr 25, 2019 at 11:47 AM Jason Gustafson  wrote:
>
> Hi Jose,
>
> This looks useful. One comment I had is whether we can improve the leader
> election tool. Needing to provide a json file is a bit annoying. Could we
> have a way to specify partitions directly through the command line? Often
> when we need to enable unclean leader election, it is just one or a small
> set of partitions.  I'd hope to be able to do something like this.
>
> bin/kafka-elect-leaders.sh --allow-unclean --topic foo --partition 1
> --bootstrap-server localhost:9092

Thanks for the feedback Jason. How about the following help output for
those flags:

--election-type 
Type of election to attempt. Possible values are 0 (or "preferred") for
preferred election or 1 (or "uncleaned") for uncleaned election. The
default value is 0 (or "preferred"), if --topic and --partition is
specified . Not allowed if --path-to-json-file is specified.

--topic 
Name of topic for which to perform an election. REQUIRED if --partition is
specified. Not allowed if --path-to-json-file is specified.

--partition 
Partition id for which to perform an election. REQUIRED if --topic is
specified. Not allowed if --path-to-json-file is specified.

--path-to-json-file 
...
Defaults to preferred election to all existing partitions if --topic and
--partition flags are not specified.

>
> Also there's a comment if the json file is not provided, the help document
> says "Defaults to all existing partitions." I assume we would not keep
this
> behavior?

Unfortunately, this behaviour is at the protocol level. If the Kafka
controller receives a request with a null for "TopicPartitions" then it
assumes that the user is attempting to perform a preferred leader election
on all of the partitions. I am not sure if we can remove this functionality
at this point. We could remove this feature from the CLI/command while
keeping it at the protocol level. What do we think?

This is the code that handles this:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L2365-L2366

>
> The only other question I had is whether we ought to deprecate
> `AdminClient.electPreferredLeaders`?

Yes. We should deprecate this method. I'll update the KIP.

Thanks!


[jira] [Resolved] (KAFKA-8291) System test consumer_test.py failed on trunk

2019-04-25 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8291.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> System test consumer_test.py failed on trunk
> 
>
> Key: KAFKA-8291
> URL: https://issues.apache.org/jira/browse/KAFKA-8291
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>
> Looks like trunk is failing as for now 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2537/]
> Potentially due to this PR: 
> [https://github.com/apache/kafka/commit/409fabc5610443f36574bdea2e2994b6c20e2829]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-25 Thread Chris Egerton
Hi Magesh,

Agreed that we should avoid `dlq.admin`. I also don't have a strong opinion
between `connector.` and `.override`, but I have a slight inclination
toward `.override` since `connector.` feels a little redundant given that
the whole configuration is for the connector and the use of "override" may
shed a little light on how the properties for these clients are computed
and help make the learning curve a little gentler on new devs and users.

Regardless, I think the larger issue of conflicts with existing properties
(both in MM2 and potentially other connectors) has been satisfactorily
addressed, so I'm happy.

Cheers,

Chris

On Wed, Apr 24, 2019 at 11:14 AM Magesh Nandakumar 
wrote:

> HI Chrise,
>
> You are right about the "admin." prefix creating conflicts. Here are few
> options that I can think of
>
> 1. Use `dlq.admin` since admin client is used only for DLQ. But this might
> not really be the case in the future. So, we should possibly drop this idea
> :)
> 2.  Use `connector.producer`, `connector.consumer` and `connector.admin` -
> provides better context that its connector specific property
> 3.  Use `producer.override`, '`consumer.override` and `admin.override` -
> provides better clarity that these are overrides.
>
> I don't have a strong opinion in choosing between #2 and #3. Let me
> know what you think.
>
> Thanks
> Magesh
>
> On Wed, Apr 24, 2019 at 10:25 AM Chris Egerton 
> wrote:
>
> > Hi Magesh,
> >
> > Next round :)
> >
> > 1. It looks like MM2 will also support "admin." properties that affect
> > AdminClients it creates and uses, which IIUC is the same prefix name to
> be
> > used for managing the DLQ for sink connectors in this KIP. Doesn't that
> > still leave room for conflict? I'm imagining a scenario like this: a
> > Connect worker is configured to use the
> > PrincipalConnectorClientConfigPolicy, someone tries to start an instance
> of
> > an MM2 sink with "admin." properties beyond just
> "admin.sasl.jaas.config",
> > and gets rejected because those properties are then interpreted by the
> > worker as overrides for the AdminClient it uses to manage the DLQ.
> > 2. (LGTM)
> > 3. I'm convinced by this, as long as nobody else identifies a common use
> > case that would involve a similar client config policy implementation
> that
> > would be limited to a small set of whitelisted configs. For now keeping
> the
> > PrincipalConnectorClientConfigPolicy sounds fine to me.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Apr 23, 2019 at 10:30 PM Magesh Nandakumar  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I also have a draft implementation of the KIP
> > > https://github.com/apache/kafka/pull/6624. I would still need to
> include
> > > more tests and docs but I thought it would be useful to have this for
> the
> > > KIP discussion. Looking forward to all of your valuable feedback.
> > >
> > > Thanks
> > > Magesh
> > >
> > > On Tue, Apr 23, 2019 at 10:27 PM Magesh Nandakumar <
> mage...@confluent.io
> > >
> > > wrote:
> > >
> > > > Chrise,
> > > >
> > > > Thanks a lot for your feedback. I will address them in order of your
> > > > questions/comments.
> > > >
> > > > 1. Thanks for bringing this to my attention about KIP-382. I had a
> > closer
> > > > look at the KIP and IIUC, the KIP allows `consumer.` prefix for
> > > SourceConnector
> > > > and producer. prefix for SinkConnector since those are additional
> > > > connector properties to help resolve the Kafka cluster other than the
> > one
> > > > Connect framework knows about. Whereas, the proposal in KIP-458
> applies
> > > > producer policies for SinkConnectors and consumer policies
> > > > SourceConnectors.  So, from what I understand this new policy should
> > work
> > > > without any issues even for Mirror Maker 2.0.
> > > > 2. I have updated the KIP to use a default value of null and use that
> > to
> > > > determine if we need to ignore overrides.
> > > > 3. I would still prefer to keep the special
> > > PrincipalConnectorClientConfigPolicy
> > > > since that is one of the most common use cases one would choose to
> use
> > > this
> > > > feature. If we make it a general case, that would involve users
> > requiring
> > > > to add additional configuration and they might require well more than
> > > just
> > > > the list of configs but might also want some restriction on values.
> If
> > > the
> > > > concern is about users wanting principal and also other configs, it
> > would
> > > > still be possible by means of a custom implementation. As is, I would
> > > > prefer to keep the proposal to be the same for this. Let me know your
> > > > thoughts.
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > >
> > > > On Mon, Apr 22, 2019 at 3:44 PM Chris Egerton 
> > > wrote:
> > > >
> > > >> Hi Magesh,
> > > >>
> > > >> This is an exciting KIP! I have a few questions/comments but
> overall I
> > > >> like
> > > >> the direction it's headed in and hope to see it included in the
> > Connect
> > > >> framework soon.
> > > >>
> > > >> 

[jira] [Created] (KAFKA-8292) Add support for --version parameter to command line tools

2019-04-25 Thread JIRA
Sönke Liebau created KAFKA-8292:
---

 Summary: Add support for --version parameter to command line tools
 Key: KAFKA-8292
 URL: https://issues.apache.org/jira/browse/KAFKA-8292
 Project: Kafka
  Issue Type: Improvement
Reporter: Sönke Liebau


During the implemenation of 
[KAFKA-8131|https://issues.apache.org/jira/browse/KAFKA-8131] we noticed that 
command line tools implement parsing of parameters in different ways.
For most of the tools the --version parameter was correctly implemented in that 
issue, for the following this still remains to be done:
* ConnectDistributed
* ConnectStandalone
* ProducerPerformance
* VerifiableConsumer
* VerifiableProducer



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.0-jdk8 #256

2019-04-25 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk8 #3582

2019-04-25 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-8288) KStream.through consumer does not use custom TimestampExtractor

2019-04-25 Thread Maarten (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maarten resolved KAFKA-8288.

Resolution: Invalid

> KStream.through consumer does not use custom TimestampExtractor
> ---
>
> Key: KAFKA-8288
> URL: https://issues.apache.org/jira/browse/KAFKA-8288
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Maarten
>Priority: Minor
>
> The Kafka consumer created by {{KStream.through}} does not seem to be using 
> the custom TimestampExtractor set in Kafka Streams properties.
> The documentation of {{through}} states the following
> {code:java}
> ...
> This is equivalent to calling to(someTopic, Produced.with(keySerde, 
> valueSerde) and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, 
> valueSerde)).
> {code}
> However when I use the pattern above, the custom TimestampExtractor _is_ 
> called.
> I have verified that the streams app is reading from the specified topic and 
> that the timestamp extractor is called for other topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-25 Thread Jason Gustafson
Hi Jose,

This looks useful. One comment I had is whether we can improve the leader
election tool. Needing to provide a json file is a bit annoying. Could we
have a way to specify partitions directly through the command line? Often
when we need to enable unclean leader election, it is just one or a small
set of partitions.  I'd hope to be able to do something like this.

bin/kafka-elect-leaders.sh --allow-unclean --topic foo --partition 1
--bootstrap-server localhost:9092

Also there's a comment if the json file is not provided, the help document
says "Defaults to all existing partitions." I assume we would not keep this
behavior?

The only other question I had is whether we ought to deprecate
`AdminClient.electPreferredLeaders`?

Thanks,
Jason

On Thu, Apr 25, 2019 at 11:23 AM Jose Armando Garcia Sancio <
jsan...@confluent.io> wrote:

> On Thu, Apr 25, 2019 at 8:22 AM Colin McCabe  wrote:
> >
> > On Wed, Apr 24, 2019, at 21:04, Jose Armando Garcia Sancio wrote:
> > > Thanks for the reply. Comments below.
> > >
> > > On Wed, Apr 24, 2019 at 6:07 PM Colin McCabe 
> wrote:
> > > > What's the rationale for using an int8 rather than just having a
> boolean
> > > > that is either true or false for "unclean"?  We only have two values
> now,
> > > > and it seems like we could modify the RPC schema in the future if
> needed.
> > > > Or is the intention to add more flags later?
> > > >
> > >
> > > There are two reason:
> > >
> > >1. The controller supports 4 (5 technically) different election
> > >algorithms. We are only exposing "preferred" and "unclean" through
> the
> > >admin client because we only have use cases for those two election
> types.
> > >It is possible that in the future we may want to support more
> algorithms.
> > >This would allow us to make that change easier.
> > >2. I believe that an enum is more descriptive than a boolean flag
> as it
> > >is not a matter of "unclean" vs "clean" or "preferred" vs
> "non-preferred".
> > >   1. Preferred means that the controller will attempt to elect
> only the
> > >   fist replica describe in the partition assignment if it is
> > > online and it is
> > >   in-sync.
> > >   2. Unclean means that the controller will attempt to elect the
> first
> > >   in-sync and alive replica given the order of the partition
> assignment. If
> > >   this is not satisfied it will attempt to elect the first replica
> in the
> > >   assignment that is alive.
> > >
> >
> > OK, that makes sense.
> >
> > On an unrelated note, you can simplify your protocol definition to this,
> I believe:
> >
> > { "name": "Partitions", "type": "[]Partitions", "versions": "1+",
> >   "about": "The partitions of this topic whose leader should be
> elected.",
> >   "fields": [
> >   { "name": "PartitionId", "type": "int32", "versions": "0+",
> > "about": "The partition id." },
> >   { "name": "ElectionType", "type": "int8", "versions": "1+",
> > "about": "Type of elections to conduct for the
> partition. A value of '0' elects the preferred leader. A value of '1'
> elects an unclean leader if there are no in-sync leaders." }
> >   ]
> > }
>
> Great suggestion. I made one modification to the "versions" field for
> "Partitions". Let me know which one is correct. The KIP should have
> the final result.
>
> >
> > The reason is because the v0 array of ints is equivalent on the wire to
> an array of structures that only have an int inside.
> >
> > In other words, this:
> > { "type": "[]int32" }
> >
> > is just another way of saying this:
> > { "type": "[]MyArrayType", "fields": [
> >   { "name": "MyInt", "type": "int32",  } ] }
>
> Very cool. This is the definition of zero overhead abstraction.
>
> -Jose
>


Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-25 Thread Jose Armando Garcia Sancio
On Thu, Apr 25, 2019 at 8:22 AM Colin McCabe  wrote:
>
> On Wed, Apr 24, 2019, at 21:04, Jose Armando Garcia Sancio wrote:
> > Thanks for the reply. Comments below.
> >
> > On Wed, Apr 24, 2019 at 6:07 PM Colin McCabe  wrote:
> > > What's the rationale for using an int8 rather than just having a boolean
> > > that is either true or false for "unclean"?  We only have two values now,
> > > and it seems like we could modify the RPC schema in the future if needed.
> > > Or is the intention to add more flags later?
> > >
> >
> > There are two reason:
> >
> >1. The controller supports 4 (5 technically) different election
> >algorithms. We are only exposing "preferred" and "unclean" through the
> >admin client because we only have use cases for those two election types.
> >It is possible that in the future we may want to support more algorithms.
> >This would allow us to make that change easier.
> >2. I believe that an enum is more descriptive than a boolean flag as it
> >is not a matter of "unclean" vs "clean" or "preferred" vs 
> > "non-preferred".
> >   1. Preferred means that the controller will attempt to elect only the
> >   fist replica describe in the partition assignment if it is
> > online and it is
> >   in-sync.
> >   2. Unclean means that the controller will attempt to elect the first
> >   in-sync and alive replica given the order of the partition 
> > assignment. If
> >   this is not satisfied it will attempt to elect the first replica in 
> > the
> >   assignment that is alive.
> >
>
> OK, that makes sense.
>
> On an unrelated note, you can simplify your protocol definition to this, I 
> believe:
>
> { "name": "Partitions", "type": "[]Partitions", "versions": "1+",
>   "about": "The partitions of this topic whose leader should be 
> elected.",
>   "fields": [
>   { "name": "PartitionId", "type": "int32", "versions": "0+",
> "about": "The partition id." },
>   { "name": "ElectionType", "type": "int8", "versions": "1+",
> "about": "Type of elections to conduct for the partition. A 
> value of '0' elects the preferred leader. A value of '1' elects an unclean 
> leader if there are no in-sync leaders." }
>   ]
> }

Great suggestion. I made one modification to the "versions" field for
"Partitions". Let me know which one is correct. The KIP should have
the final result.

>
> The reason is because the v0 array of ints is equivalent on the wire to an 
> array of structures that only have an int inside.
>
> In other words, this:
> { "type": "[]int32" }
>
> is just another way of saying this:
> { "type": "[]MyArrayType", "fields": [
>   { "name": "MyInt", "type": "int32",  } ] }

Very cool. This is the definition of zero overhead abstraction.

-Jose


[jira] [Created] (KAFKA-8291) System test consumer_test.py failed on trunk

2019-04-25 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8291:
--

 Summary: System test consumer_test.py failed on trunk
 Key: KAFKA-8291
 URL: https://issues.apache.org/jira/browse/KAFKA-8291
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


Looks like trunk is failing as for now 
[https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2537/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-04-25 Thread Jason Gustafson
Hi Viktor,

This looks good. Just one question I had is whether we may as well cover
the log dir fetchers as well.

Thanks,
Jason


On Thu, Apr 25, 2019 at 7:46 AM Viktor Somogyi-Vass 
wrote:

> Hi Folks,
>
> This thread sunk a bit but I'd like to bump it hoping to get some feedback
> and/or votes.
>
> Thanks,
> Viktor
>
> On Thu, Mar 28, 2019 at 8:47 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Sorry, the end of the message cut off.
> >
> > So I tried to be consistent with the convention in LogManager, hence the
> > hyphens and in AbstractFetcherManager, hence the camel case. It would be
> > nice though to decide with one convention across the whole project,
> however
> > it requires a major refactor (especially for the components that leverage
> > metrics for monitoring).
> >
> > Thanks,
> > Viktor
> >
> > On Thu, Mar 28, 2019 at 8:44 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com> wrote:
> >
> >> Hi Dhruvil,
> >>
> >> Thanks for the feedback and the vote. I fixed the typo in the KIP.
> >> The naming is interesting though. Unfortunately kafka overall is not
> >> consistent in metric naming but at least I tried to be consistent among
> the
> >> other metrics used in LogManager
> >>
> >> On Thu, Mar 28, 2019 at 7:32 PM Dhruvil Shah 
> >> wrote:
> >>
> >>> Thanks for the KIP, Viktor! This is a useful addition. +1 overall.
> >>>
> >>> Minor nits:
> >>> > I propose to add three gauge: DeadFetcherThreadCount for the fetcher
> >>> threads, log-cleaner-dead-thread-count for the log cleaner.
> >>> I think you meant two instead of three.
> >>>
> >>> Also, would it make sense to name these metrics consistency, something
> >>> like
> >>> `log-cleaner-dead-thread-count` and
> `replica-fetcher-dead-thread-count`?
> >>>
> >>> Thanks,
> >>> Dhruvil
> >>>
> >>> On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
> >>> viktorsomo...@gmail.com> wrote:
> >>>
> >>> > Hi All,
> >>> >
> >>> > I'd like to start a vote on KIP-434.
> >>> > This basically would add a metrics to count dead threads in
> >>> > ReplicaFetcherManager and LogCleaner to allow monitoring systems to
> >>> alert
> >>> > based on this.
> >>> >
> >>> > The KIP link:
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> >>> > The
> >>> > PR: https://github.com/apache/kafka/pull/6514
> >>> >
> >>> > I'd be happy to receive any votes or additional feedback/reviews too.
> >>> >
> >>> > Thanks,
> >>> > Viktor
> >>> >
> >>>
> >>
>


[jira] [Created] (KAFKA-8290) Streams Not Closing Fenced Producer On Task Close

2019-04-25 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8290:
--

 Summary: Streams Not Closing Fenced Producer On Task Close
 Key: KAFKA-8290
 URL: https://issues.apache.org/jira/browse/KAFKA-8290
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck


When a producer is fenced during processing and a rebalance is triggered, the 
task closed, but the (zombie) producer is not.  When EOS is enabled and we 
close a task the producer should always be closed regardless if it was fenced 
or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [ANNOUNCE] New Kafka PMC member: Sriharsh Chintalapan

2019-04-25 Thread Viktor Somogyi-Vass
Congrats Harsha!

On Tue, Apr 23, 2019 at 6:13 AM Becket Qin  wrote:

> Congrats, Harsh!
>
> On Tue, Apr 23, 2019 at 5:41 AM Colin McCabe  wrote:
>
> > Congratulations, Harsh!
> >
> > cheers,
> > Colin
> >
> >
> > On Thu, Apr 18, 2019, at 11:46, Jun Rao wrote:
> > > Hi, Everyone,
> > >
> > > Sriharsh Chintalapan has been active in the Kafka community since he
> > became
> > > a Kafka committer in 2015. I am glad to announce that Harsh is now a
> > member
> > > of Kafka PM
> > >
> > > Congratulations, Harsh!
> > >
> > > Jun
> > >
> >
>


Re: [ANNOUNCE] New Kafka PMC member: Matthias J. Sax

2019-04-25 Thread Viktor Somogyi-Vass
Congrats Matthias!

On Tue, Apr 23, 2019 at 4:24 AM Becket Qin  wrote:

> Congrats, Matthias!
>
> On Sat, Apr 20, 2019 at 10:28 AM Matthias J. Sax 
> wrote:
>
> > Thank you all!
> >
> > -Matthias
> >
> >
> > On 4/19/19 3:58 PM, Lei Chen wrote:
> > > Congratulations Matthias! Well deserved!
> > >
> > > -Lei
> > >
> > > On Fri, Apr 19, 2019 at 2:55 PM James Cheng  > > > wrote:
> > >
> > > Congrats!!
> > >
> > > -James
> > >
> > > Sent from my iPhone
> > >
> > > > On Apr 18, 2019, at 2:35 PM, Guozhang Wang  > > > wrote:
> > > >
> > > > Hello Everyone,
> > > >
> > > > I'm glad to announce that Matthias J. Sax is now a member of
> Kafka
> > > PMC.
> > > >
> > > > Matthias has been a committer since Jan. 2018, and since then he
> > > continued
> > > > to be active in the community and made significant contributions
> > the
> > > > project.
> > > >
> > > >
> > > > Congratulations to Matthias!
> > > >
> > > > -- Guozhang
> > >
> >
> >
>


Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-25 Thread Colin McCabe
On Wed, Apr 24, 2019, at 21:04, Jose Armando Garcia Sancio wrote:
> Thanks for the reply. Comments below.
> 
> On Wed, Apr 24, 2019 at 6:07 PM Colin McCabe  wrote:
> 
> > Hi Jose,
> >
> > Thanks for the KIP, looks valuable.
> >
> > If I use a PreferredLeaderElection RPC to specifically request an unclean
> > leader election, will this take effect even if unclean leader elections are
> > disabled on the topic involved?  I assume that the answer is yes, but it
> > would be good to clarify this in the KIP.
> >
> 
> Yes. One of the motivation for this change is to allow the user to attempt
> unclean leader election without having to change the topic configuration. I
> will update the motivation and design section.

Hi Jose,

Sounds good.

> 
> What ACLs will be required to perform this action?  WRITE on the topic
> > resource?  Or ALTER on KafkaCluster?  Or perhaps ALTER on the topic would
> > be most appropriate, since we probably don't want ordinary producers
> > triggering unclean leader elections.
> >
> 
> I am not sure. Let me investigate what the current RPC requires and get
> back to you. This is not a new RP We are instead updating an existing RPC
> that already performs authorization. The RPC has API key 32.

It looks like the existing RPC requires ALTER on CLUSTER.  That's definitely 
safe, since it's essentially root for us.  We can probably just keep it the way 
it is, then.

> 
> What's the rationale for using an int8 rather than just having a boolean
> > that is either true or false for "unclean"?  We only have two values now,
> > and it seems like we could modify the RPC schema in the future if needed.
> > Or is the intention to add more flags later?
> >
> 
> There are two reason:
> 
>1. The controller supports 4 (5 technically) different election
>algorithms. We are only exposing "preferred" and "unclean" through the
>admin client because we only have use cases for those two election types.
>It is possible that in the future we may want to support more algorithms.
>This would allow us to make that change easier.
>2. I believe that an enum is more descriptive than a boolean flag as it
>is not a matter of "unclean" vs "clean" or "preferred" vs "non-preferred".
>   1. Preferred means that the controller will attempt to elect only the
>   fist replica describe in the partition assignment if it is
> online and it is
>   in-sync.
>   2. Unclean means that the controller will attempt to elect the first
>   in-sync and alive replica given the order of the partition assignment. 
> If
>   this is not satisfied it will attempt to elect the first replica in the
>   assignment that is alive.
>

OK, that makes sense.

On an unrelated note, you can simplify your protocol definition to this, I 
believe:

{ "name": "Partitions", "type": "[]Partitions", "versions": "1+",
  "about": "The partitions of this topic whose leader should be 
elected.",
  "fields": [
  { "name": "PartitionId", "type": "int32", "versions": "0+",
"about": "The partition id." },
  { "name": "ElectionType", "type": "int8", "versions": "1+",
"about": "Type of elections to conduct for the partition. A 
value of '0' elects the preferred leader. A value of '1' elects an unclean 
leader if there are no in-sync leaders." } 
  ]
}

The reason is because the v0 array of ints is equivalent on the wire to an 
array of structures that only have an int inside.

In other words, this:
{ "type": "[]int32" }

is just another way of saying this:
{ "type": "[]MyArrayType", "fields": [
  { "name": "MyInt", "type": "int32",  } ] }

best,
Colin


Re: [VOTE] KIP-419: Safely notify Kafka Connect SourceTask is stopped

2019-04-25 Thread Andrew Schofield
I'd like to encourage some more votes on KIP-419. It's a pretty small KIP to 
make it easier to handle resource clean up in Kafka Connect SourceTasks.

Currently only +2 non-binding.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped
PR: https://github.com/apache/kafka/pull/6551

Thanks,
Andrew Schofield
IBM Event Streams

On 15/04/2019, 15:59, "Edoardo Comar"  wrote:

Thanks Andrew.

+1 (non-binding)

--

Edoardo Comar

IBM Event Streams
IBM UK Ltd, Hursley Park, SO21 2JN




From:   Mickael Maison 
To: dev 
Date:   10/04/2019 10:14
Subject:Re: [VOTE] KIP-419: Safely notify Kafka Connect SourceTask 
is stopped



+1 (non-binding)
Thanks for the KIP!

On Mon, Apr 8, 2019 at 8:07 PM Andrew Schofield
 wrote:
>
> Hi,
> I’d like to begin the voting thread for KIP-419. This is a minor KIP to 
add a new stopped() method to the SourceTask interface in Kafka Connect. 
Its purpose is to give the task a safe opportunity to clean up its 
resources, in the knowledge that this is the final call to the task.
>
> KIP: 

https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Furldefense.proofpoint.com%2Fv2%2Furl%3Fu%3Dhttps-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D419-253A-2BSafely-2Bnotify-2BKafka-2BConnect-2BSourceTask-2Bis-2Bstopped%26d%3DDwIFaQ%26c%3Djf_iaSHvJObTbx-siA1ZOg%26r%3DEzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ%26m%3DvBvXztcRTgKwMpQ54ziN_GoOo0_fHSvTEMoXwQABvfs%26s%3DParyN6mWVuOGJR7kA84NOshRJA2LAK6htiD2gqf-h_M%26edata=02%7C01%7C%7Cffb96440b207419a7fcd08d6c1b2ed17%7C84df9e7fe9f640afb435%7C1%7C0%7C636909371551269648sdata=WJ4ZgMEIUTl83QXBIm%2Fn3ekWWabpZTIWsPbQOQGR6J8%3Dreserved=0=

> PR: 

https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Furldefense.proofpoint.com%2Fv2%2Furl%3Fu%3Dhttps-3A__github.com_apache_kafka_pull_6551%26d%3DDwIFaQ%26c%3Djf_iaSHvJObTbx-siA1ZOg%26r%3DEzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ%26m%3DvBvXztcRTgKwMpQ54ziN_GoOo0_fHSvTEMoXwQABvfs%26s%3DR_udYap1tpd83ISv1Rh0TY6ttH6RuEIwQ0KwOFMB3zU%26edata=02%7C01%7C%7Cffb96440b207419a7fcd08d6c1b2ed17%7C84df9e7fe9f640afb435%7C1%7C0%7C636909371551279653sdata=4LrcZVcLG9acQm7rjZz8%2F9MO2UeKK08242TW1SSJdlE%3Dreserved=0=

> JIRA: 

https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Furldefense.proofpoint.com%2Fv2%2Furl%3Fu%3Dhttps-3A__issues.apache.org_jira_browse_KAFKA-2D7841%26d%3DDwIFaQ%26c%3Djf_iaSHvJObTbx-siA1ZOg%26r%3DEzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ%26m%3DvBvXztcRTgKwMpQ54ziN_GoOo0_fHSvTEMoXwQABvfs%26s%3D5WqDQPU2J8yAxRXsjOgydtzJSE8yQCoB7qX0TtQyHA0%26edata=02%7C01%7C%7Cffb96440b207419a7fcd08d6c1b2ed17%7C84df9e7fe9f640afb435%7C1%7C0%7C636909371551279653sdata=JxRlNBP9FmuCmSVHIj6T30eT3uMPijbHi%2B%2F1QsUfA5U%3Dreserved=0=

>
> Thanks,
> Andrew Schofield
> IBM




Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU





Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-04-25 Thread Viktor Somogyi-Vass
Hi Folks,

This thread sunk a bit but I'd like to bump it hoping to get some feedback
and/or votes.

Thanks,
Viktor

On Thu, Mar 28, 2019 at 8:47 PM Viktor Somogyi-Vass 
wrote:

> Sorry, the end of the message cut off.
>
> So I tried to be consistent with the convention in LogManager, hence the
> hyphens and in AbstractFetcherManager, hence the camel case. It would be
> nice though to decide with one convention across the whole project, however
> it requires a major refactor (especially for the components that leverage
> metrics for monitoring).
>
> Thanks,
> Viktor
>
> On Thu, Mar 28, 2019 at 8:44 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
>> Hi Dhruvil,
>>
>> Thanks for the feedback and the vote. I fixed the typo in the KIP.
>> The naming is interesting though. Unfortunately kafka overall is not
>> consistent in metric naming but at least I tried to be consistent among the
>> other metrics used in LogManager
>>
>> On Thu, Mar 28, 2019 at 7:32 PM Dhruvil Shah 
>> wrote:
>>
>>> Thanks for the KIP, Viktor! This is a useful addition. +1 overall.
>>>
>>> Minor nits:
>>> > I propose to add three gauge: DeadFetcherThreadCount for the fetcher
>>> threads, log-cleaner-dead-thread-count for the log cleaner.
>>> I think you meant two instead of three.
>>>
>>> Also, would it make sense to name these metrics consistency, something
>>> like
>>> `log-cleaner-dead-thread-count` and `replica-fetcher-dead-thread-count`?
>>>
>>> Thanks,
>>> Dhruvil
>>>
>>> On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
>>> viktorsomo...@gmail.com> wrote:
>>>
>>> > Hi All,
>>> >
>>> > I'd like to start a vote on KIP-434.
>>> > This basically would add a metrics to count dead threads in
>>> > ReplicaFetcherManager and LogCleaner to allow monitoring systems to
>>> alert
>>> > based on this.
>>> >
>>> > The KIP link:
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
>>> > The
>>> > PR: https://github.com/apache/kafka/pull/6514
>>> >
>>> > I'd be happy to receive any votes or additional feedback/reviews too.
>>> >
>>> > Thanks,
>>> > Viktor
>>> >
>>>
>>


Re: [VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-04-25 Thread Dongjin Lee
+1 (non-binding).

+2 (binding), +1 (non-binding) until now.

Thanks,
Dongjin

On Thu, Apr 25, 2019 at 6:16 AM Gwen Shapira  wrote:

> +1 (binding)
>
> On Fri, Feb 1, 2019, 8:20 AM Harsha Chintalapani  wrote:
>
> > +1 (binding).
> >
> > Thanks,
> > Harsha
> > On Jan 31, 2019, 8:08 PM -0800, Manikumar ,
> > wrote:
> > > Hi,
> > >
> > > +1 (binding). Thanks for the KIP.
> > >
> > > On Mon, Jan 28, 2019 at 5:37 PM Fangbin Sun 
> > wrote:
> > >
> > > > Hi, All:
> > > > I would like to start a vote on KIP-417 which aims at supporting
> > JmxTool
> > > > to connect to a secured RMI port.
> > > >
> > > >
> > > > The KIP:
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port
> > > >
> > > >
> > > > Thanks!
> > > > Fangbin
> >
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  github.com/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*


Re: [DISCUSS] 2.2.1 Bug Fix Release

2019-04-25 Thread Ismael Juma
Thanks Vahid!

On Wed, Apr 24, 2019 at 10:44 PM Vahid Hashemian 
wrote:

> Hi all,
>
> I'd like to volunteer for the release manager of the 2.2.1 bug fix release.
> Kafka 2.2.0 was released on March 22, 2019.
>
> At this point, there are 29 resolved JIRA issues scheduled for inclusion in
> 2.2.1:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%202.2.1
>
> The release plan is documented here:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.2.1
>
> Thanks!
> --Vahid
>


Re: [VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-04-25 Thread Gwen Shapira
+1 (binding)

On Fri, Feb 1, 2019, 8:20 AM Harsha Chintalapani  wrote:

> +1 (binding).
>
> Thanks,
> Harsha
> On Jan 31, 2019, 8:08 PM -0800, Manikumar ,
> wrote:
> > Hi,
> >
> > +1 (binding). Thanks for the KIP.
> >
> > On Mon, Jan 28, 2019 at 5:37 PM Fangbin Sun 
> wrote:
> >
> > > Hi, All:
> > > I would like to start a vote on KIP-417 which aims at supporting
> JmxTool
> > > to connect to a secured RMI port.
> > >
> > >
> > > The KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port
> > >
> > >
> > > Thanks!
> > > Fangbin
>


Build failed in Jenkins: kafka-trunk-jdk8 #3581

2019-04-25 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8237; Untangle TopicDeleteManager and add test cases (#6588)

[jason] MINOR: Remove implicit return statement (#6629)

--
[...truncated 4.77 MB...]
org.apache.kafka.connect.transforms.RegexRouterTest > identity PASSED

org.apache.kafka.connect.transforms.RegexRouterTest > addPrefix STARTED

org.apache.kafka.connect.transforms.RegexRouterTest > addPrefix PASSED

org.apache.kafka.connect.transforms.RegexRouterTest > addSuffix STARTED

org.apache.kafka.connect.transforms.RegexRouterTest > addSuffix PASSED

org.apache.kafka.connect.transforms.RegexRouterTest > slice STARTED

org.apache.kafka.connect.transforms.RegexRouterTest > slice PASSED

org.apache.kafka.connect.transforms.RegexRouterTest > staticReplacement STARTED

org.apache.kafka.connect.transforms.RegexRouterTest > staticReplacement PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > 
schemalessInsertConfiguredFields STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > 
schemalessInsertConfiguredFields PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > topLevelStructRequired 
STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > topLevelStructRequired 
PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > 
copySchemaAndInsertConfiguredFields STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > 
copySchemaAndInsertConfiguredFields PASSED

org.apache.kafka.connect.transforms.ReplaceFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.ReplaceFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.ReplaceFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.ReplaceFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.MaskFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.MaskFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.MaskFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.MaskFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testNullList STARTED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testNullList PASSED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testEmptyList STARTED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testEmptyList PASSED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testValidList STARTED

org.apache.kafka.connect.transforms.util.NonEmptyListValidatorTest > 
testValidList PASSED

org.apache.kafka.connect.transforms.FlattenTest > testKey STARTED

org.apache.kafka.connect.transforms.FlattenTest > testKey PASSED

org.apache.kafka.connect.transforms.FlattenTest > 
testOptionalAndDefaultValuesNested STARTED

org.apache.kafka.connect.transforms.FlattenTest > 
testOptionalAndDefaultValuesNested PASSED

org.apache.kafka.connect.transforms.FlattenTest > topLevelMapRequired STARTED

org.apache.kafka.connect.transforms.FlattenTest > topLevelMapRequired PASSED

org.apache.kafka.connect.transforms.FlattenTest > topLevelStructRequired STARTED

org.apache.kafka.connect.transforms.FlattenTest > topLevelStructRequired PASSED

org.apache.kafka.connect.transforms.FlattenTest > testOptionalFieldStruct 
STARTED

org.apache.kafka.connect.transforms.FlattenTest > testOptionalFieldStruct PASSED

org.apache.kafka.connect.transforms.FlattenTest > testNestedMapWithDelimiter 
STARTED

org.apache.kafka.connect.transforms.FlattenTest > testNestedMapWithDelimiter 
PASSED

org.apache.kafka.connect.transforms.FlattenTest > testOptionalFieldMap STARTED

org.apache.kafka.connect.transforms.FlattenTest > testOptionalFieldMap PASSED

org.apache.kafka.connect.transforms.FlattenTest > testUnsupportedTypeInMap 
STARTED

org.apache.kafka.connect.transforms.FlattenTest > testUnsupportedTypeInMap 
PASSED

org.apache.kafka.connect.transforms.FlattenTest > testNestedStruct STARTED

org.apache.kafka.connect.transforms.FlattenTest > testNestedStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct PASSED


Re: [DISCUSS] KIP-451: Make TopologyTestDriver output iterable

2019-04-25 Thread Patrik Kleindl
Hi

As discussed, if the preferred option is to consume the records always I
will change both methods in KIP-451 accordingly and also switch them to
return a List.
This would be a bit redundant with Jukkas proposal in KIP-456 so the
question is if KIP-451 should be scraped in favor of KIP-456 which has more
powerful solution but will also need a bit more changes in tests.
On the other hand both are useful and wouldn't conflict as far as I can see.

Any opinions?

best regards

Patrik

On Thu, 25 Apr 2019 at 08:55, Jukka Karvanen 
wrote:

> Hi,
>
> I played around with Patrick's KAFKA-8200 branch and I tested it with
> combined with my draft version of KIP-456.
>
> Some comments:
> These two version of iterableOutput methods are working now differently, if
> you reuse same fetched Iterable object after piping in new inputs.
> Version without serde will see the new input, but version with serdes has
> streamed the converted items already to new list and that's why
> not seeing the new item. Maybe it is intended to to fetch new Iterable each
> time, but the implementation is not mandating it.
>
> See example:
>
> https://github.com/jukkakarvanen/kafka/blob/KAFKA-8200withKIP-456/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverIterableTest.java
>
>
> I have a lot of tests where I pipe the list of input and check list of
> output items, pipe more and check the new list.
> Now with this Iterable from the beginning is not very usable if you test
> like this in multiple batches. You need to reiterate same again.
>
> In KIP-456 readKeyValuesToList returns List same way ConsumerRecordFactory
> and that way this TestInputTopic is accepting List as input.
> This collection methods in TestOutputTopic are also consuming the messages.
> So you can mix the reading individual rows and collections.
> With List it is also easier to get the number of outputs compared to
> Iterable.
>
> Please, check out also DISCUSSion of KIP.456. I will post there the link to
> the current version of implementation and you can see if it fulfill also
> your need.
>
> Jukka
>
>
> la 20. huhtik. 2019 klo 1.11 Patrik Kleindl (pklei...@gmail.com)
> kirjoitti:
>
> > Hi Matthias
> > Seems I got a bit ahead of myself.
> > With option C my aim was a simple alternative which gives back all output
> > records that have happened up to this point (and which have not been
> > removed by calls to readOutput).
> > Based on that the user can decide how to step through or compare the
> > records.
> >
> > If you see it as more consistent if the new methods removed all returned
> > records then this can easily be done.
> >
> > But maybe the pick of Iterable was too narrow.
> > It would probably be a good fit to return a List or just a Collection
> >
> > Picking up John's naming suggestion this would make this:
> >
> > public Collection> readAllOutput(final
> > String topic) {
> > final Collection> outputRecords =
> > outputRecordsByTopic.get(topic);
> > if (outputRecords == null) {
> > return Collections.emptyList();
> > }
> > outputRecordsByTopic.put(topic, new LinkedList<>());
> > return outputRecords;
> > }
> >
> > With the semantics the same as readOutput = removing everything.
> >
> > Can be changed to a List if you think it matters that a user can query
> > some index directly.
> >
> > What do you think?
> >
> > best regards
> >
> > Patrik
> >
> >
> >
> > On Fri, 19 Apr 2019 at 03:04, Matthias J. Sax 
> > wrote:
> >
> > > I am not sure if (C) is the best option to pick.
> > >
> > > What is the reasoning to suggest (C) over the other options?
> > >
> > > It seems that users cannot clear buffered output using option (C). This
> > > might it make difficult to write tests.
> > >
> > > The original Jira tickets suggest:
> > >
> > > > which returns either an iterator or list over the records that are
> > > currently available in the topic
> > >
> > > This implies that the current buffer would be cleared when getting the
> > > iterator.
> > >
> > > Also, from my understanding, the idea of iterating in general, is to
> > > step through a finite collection of objects/elements. Hence, if
> > > `hasNext()` returns `false` is will never return `true` later on.
> > >
> > > As John mentioned, Java also has support for streams, that offer
> > > different semantics, that would align with option (C). However, I am
> not
> > > sure if this would be the test API to write tests?
> > >
> > > Thoughts?
> > >
> > > In any way: whatever semantics we pick, the KIP should explain them.
> > > Atm, this part is missing in the KIP.
> > >
> > >
> > > -Matthias
> > >
> > > On 4/18/19 12:47 PM, Patrik Kleindl wrote:
> > > > Hi John
> > > >
> > > > Thanks for your feedback
> > > > It's C, it does not consume the messages in contrast to the
> readOutput.
> > > > Is it a requirement to do so?
> > > > That's why I picked a different name so the difference is more
> > > noticeable.
> > > > I will add that to the JavaDoc.
> > > >
> > > > 

[jira] [Created] (KAFKA-8289) KTable, Long> can't be suppressed

2019-04-25 Thread Xiaolin Jia (JIRA)
Xiaolin Jia created KAFKA-8289:
--

 Summary: KTable, Long>  can't be suppressed
 Key: KAFKA-8289
 URL: https://issues.apache.org/jira/browse/KAFKA-8289
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.2.0
 Environment: Broker on a Linux, stream app on my win10 laptop. 
I add one row log.message.timestamp.type=LogAppendTime to my broker's 
server.properties. stream app all default config.
Reporter: Xiaolin Jia


I encountered a problem yesterday that I got more than one [Window Final 
Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]
 from a session time window.

time ticker A -> (4,A) / 25s,

time ticker B -> (4, B) / 25s  all send to the same topic 

below is my stream app code 
{code:java}
kstreams[0]
.peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
.groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
.count()
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), k.key(), 
v));
{code}
{{here is my log print}}
{noformat}
2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556106587744, endMs=1556107129191},k=A,v=20
2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=A
2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107226473, endMs=1556107426409},k=B,v=9
2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107445012, endMs=1556107445012},k=A,v=1
2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=A
2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107226473, endMs=1556107451397},k=B,v=10
2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107445012, endMs=1556107469935},k=A,v=2
2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=A
2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:05:19.599  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=A
2019-04-24 20:05:20.045  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107226473, endMs=1556107476398},k=B,v=11
2019-04-24 20:05:20.047  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107226473, endMs=1556107501398},k=B,v=12
2019-04-24 20:05:26.075  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:05:44.598  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=A
2019-04-24 20:05:50.399  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107445012, endMs=1556107519930},k=A,v=4
2019-04-24 20:05:50.400  INFO --- [-StreamThread-1] c.g.k.AppStreams
: window=Window{startMs=1556107226473, endMs=1556107526405},k=B,v=13
2019-04-24 20:05:51.067  INFO --- [-StreamThread-1] c.g.k.AppStreams
: --> ping, k=4,v=B
2019-04-24 20:06:09.595  INFO --- [-StreamThread-1] c.g.k.AppStreams  

[jira] [Created] (KAFKA-8288) KStream.through consumer does not use custom TimestampExtractor

2019-04-25 Thread Maarten (JIRA)
Maarten created KAFKA-8288:
--

 Summary: KStream.through consumer does not use custom 
TimestampExtractor
 Key: KAFKA-8288
 URL: https://issues.apache.org/jira/browse/KAFKA-8288
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.2.0
Reporter: Maarten


The Kafka consumer created by {{KStream.through}} does not seem to be using the 
custom TimestampExtractor set in Kafka Streams properties.

The documentation of {{through}} states the following

{code:java}
...
This is equivalent to calling to(someTopic, Produced.with(keySerde, valueSerde) 
and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)).
{code}

However when I use the pattern above, the custom TimestampExtractor _is_ called.

I have verified that the streams app is reading from the specified topic and 
that the timestamp extractor is called for other topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

2019-04-25 Thread Jukka Karvanen
Hi,

If you want to see or test the my current idea of the implementation of
this KIP, you can check it out in my repo:
https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics


After my test with KPI-451  I do not see need for add methods for
Iterables, but waiting Patrick's clarification of the use case.

Jukka


ti 23. huhtik. 2019 klo 15.37 Jukka Karvanen (jukka.karva...@jukinimi.com)
kirjoitti:

> Hi All,
>
> I would like to start the discussion on KIP-456: Helper classes to make it
> simpler to write test logic with TopologyTestDriver:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-456%3A+Helper+classes+to+make+it+simpler+to+write+test+logic+with+TopologyTestDriver
>
>
> There is also related KIP adding methods to TopologyTestDriver:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-451%3A+Make+TopologyTestDriver+output+iterable
>
>
> I added those new Iterable based methods to this TestOutputTopic even not
> tested those myself yet.
> So this version contains both my original List and Map based methods and
> these new one.
> Based on the discussion some of these can be dropped, if those are seen as
> redundant.
>
> Best Regards,
> Jukka
>
>


Re: [DISCUSS] KIP-451: Make TopologyTestDriver output iterable

2019-04-25 Thread Jukka Karvanen
Hi,

I played around with Patrick's KAFKA-8200 branch and I tested it with
combined with my draft version of KIP-456.

Some comments:
These two version of iterableOutput methods are working now differently, if
you reuse same fetched Iterable object after piping in new inputs.
Version without serde will see the new input, but version with serdes has
streamed the converted items already to new list and that's why
not seeing the new item. Maybe it is intended to to fetch new Iterable each
time, but the implementation is not mandating it.

See example:
https://github.com/jukkakarvanen/kafka/blob/KAFKA-8200withKIP-456/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverIterableTest.java


I have a lot of tests where I pipe the list of input and check list of
output items, pipe more and check the new list.
Now with this Iterable from the beginning is not very usable if you test
like this in multiple batches. You need to reiterate same again.

In KIP-456 readKeyValuesToList returns List same way ConsumerRecordFactory
and that way this TestInputTopic is accepting List as input.
This collection methods in TestOutputTopic are also consuming the messages.
So you can mix the reading individual rows and collections.
With List it is also easier to get the number of outputs compared to
Iterable.

Please, check out also DISCUSSion of KIP.456. I will post there the link to
the current version of implementation and you can see if it fulfill also
your need.

Jukka


la 20. huhtik. 2019 klo 1.11 Patrik Kleindl (pklei...@gmail.com) kirjoitti:

> Hi Matthias
> Seems I got a bit ahead of myself.
> With option C my aim was a simple alternative which gives back all output
> records that have happened up to this point (and which have not been
> removed by calls to readOutput).
> Based on that the user can decide how to step through or compare the
> records.
>
> If you see it as more consistent if the new methods removed all returned
> records then this can easily be done.
>
> But maybe the pick of Iterable was too narrow.
> It would probably be a good fit to return a List or just a Collection
>
> Picking up John's naming suggestion this would make this:
>
> public Collection> readAllOutput(final
> String topic) {
> final Collection> outputRecords =
> outputRecordsByTopic.get(topic);
> if (outputRecords == null) {
> return Collections.emptyList();
> }
> outputRecordsByTopic.put(topic, new LinkedList<>());
> return outputRecords;
> }
>
> With the semantics the same as readOutput = removing everything.
>
> Can be changed to a List if you think it matters that a user can query
> some index directly.
>
> What do you think?
>
> best regards
>
> Patrik
>
>
>
> On Fri, 19 Apr 2019 at 03:04, Matthias J. Sax 
> wrote:
>
> > I am not sure if (C) is the best option to pick.
> >
> > What is the reasoning to suggest (C) over the other options?
> >
> > It seems that users cannot clear buffered output using option (C). This
> > might it make difficult to write tests.
> >
> > The original Jira tickets suggest:
> >
> > > which returns either an iterator or list over the records that are
> > currently available in the topic
> >
> > This implies that the current buffer would be cleared when getting the
> > iterator.
> >
> > Also, from my understanding, the idea of iterating in general, is to
> > step through a finite collection of objects/elements. Hence, if
> > `hasNext()` returns `false` is will never return `true` later on.
> >
> > As John mentioned, Java also has support for streams, that offer
> > different semantics, that would align with option (C). However, I am not
> > sure if this would be the test API to write tests?
> >
> > Thoughts?
> >
> > In any way: whatever semantics we pick, the KIP should explain them.
> > Atm, this part is missing in the KIP.
> >
> >
> > -Matthias
> >
> > On 4/18/19 12:47 PM, Patrik Kleindl wrote:
> > > Hi John
> > >
> > > Thanks for your feedback
> > > It's C, it does not consume the messages in contrast to the readOutput.
> > > Is it a requirement to do so?
> > > That's why I picked a different name so the difference is more
> > noticeable.
> > > I will add that to the JavaDoc.
> > >
> > > I see your point regarding future changes, that's why I linked KIP-456
> > > where such a method is proposed and would maybe allow to deprecate my
> > > version in favor of a bigger solution.
> > >
> > > Hope that answers your questions
> > >
> > > best regards
> > > Patrik
> > >
> > >
> > > On Thu, 18 Apr 2019 at 19:46, John Roesler  wrote:
> > >
> > >> Hi, Patrik,
> > >>
> > >> Thanks for this proposal!
> > >>
> > >> I have one question, which I didn't see addressed by the KIP.
> Currently,
> > >> when you call `readOutput`, it consumes the result (removes it from
> the
> > >> test driver's output). Does your proposed method:
> > >> A: consume the whole output stream for that topic "atomically" when it
> > >> returns the iterable? (i.e., two calls in a row would guarantee