Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Magesh Nandakumar
Randall,

Thanks a lot for the suggestions. I have incorporated the comments in the
KIP.

Thanks,
Magesh

On Mon, May 6, 2019 at 6:52 PM Randall Hauch  wrote:

> Thanks, Magesh. I do have a few pretty minor suggestions.
>
> 1) Define a bit more clearly in the "Proposed Changes" whether the configs
> passed to the validate method via the ConnectorClientConfigRequest object
> have or do not have the prefixes.
> 2) Specify more clearly in (or around) the table which is the default
> policy. Currently the Ignore policy "Behavior" just mentions that it's the
> current behavior, but I think it would help that it is described as the
> default for the property.
>
> Otherwise, this looks good to me.
>
> Best regards,
>
> Randall
>
> On Mon, May 6, 2019 at 8:12 PM Magesh Nandakumar 
> wrote:
>
> > Konstantine,
> >
> > Thanks a lot for your feedback on the KIP. I have incorporated the
> feedback
> > using generics for Class. I have also updated the KIP to handle the
> default
> > value per Randall's suggestion. Let me know if you have any questions.
> >
> > Thanks,
> > Magesh
> >
> >
> > On Mon, May 6, 2019 at 1:58 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Thanks for the KIP Magesh, it's quite useful towards the goals for more
> > > general multi-tenancy in Connect.
> > >
> > > Couple of comments from me too:
> > >
> > > I think the fact that the default policy is 'null' (no implementation)
> > > should be mentioned on the table next to the available implementations.
> > > Currently the KIP says: 'In addition to the default implementation,
> ..."
> > > but this is not very accurate because there is no concrete default
> > > implementation. Just special handling of 'null' in
> > > 'connector.client.config.policy'
> > >
> > > Regarding passing the overrides to the connector 'configure' method, I
> > feel
> > > it wouldn't hurt to pass them, but I also agree that leaving this out
> at
> > > the moment is the safest option.
> > >
> > > Since the interfaces and classes are listed in the KIP, I'd like to
> note
> > > that Class is used as a raw type in field and return value
> declarations.
> > We
> > > should use the generic type instead.
> > >
> > > Thanks for this improvement proposal!
> > > Konstantine
> > >
> > > On Mon, May 6, 2019 at 11:11 AM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Randall,
> > > >
> > > > I was wondering if you had any thoughts on the above alternatives to
> > deal
> > > > with a default policy.  If it's possible, I would like to finalize
> the
> > > > discussions and start a vote.
> > > > Let me know your thoughts.
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar <
> > mage...@confluent.io>
> > > > wrote:
> > > >
> > > > > Randall,
> > > > >
> > > > > The approach to return the to override configs could possibly make
> it
> > > > > cumbersome to implement a custom policy. This is a new
> configuration
> > > and
> > > > if
> > > > > you don't explicitly set it the existing behavior remains as-is.
> Like
> > > > > Chris, I also preferred this approach for the sake of simplicity.
> If
> > > not
> > > > > for the default `null` I would prefer to fall back to using
> `Ignore`
> > > > which
> > > > > is a misnomer to the interface spec but still gets the job done via
> > > > > instanceOf checks. The other options I could think of are as
> below:-
> > > > >
> > > > >- have an enforcePolicy() method in the interface which by
> default
> > > > >returns true and the Ignore implementation could return false
> > > > >- introduce another worker config
> allow.connector.config.overrides
> > > > >with a default value of false and the default policy can be None
> > > > >
> > > > > Let me know what you think.
> > > > >
> > > > > Thanks
> > > > > Magesh
> > > > >
> > > > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> > > wrote:
> > > > >
> > > > >> Thanks, Chris. I still think it's strange to have a non-policy,
> > since
> > > > >> there's now special behavior for when the policy is not specified.
> > > > >>
> > > > >> Perhaps the inability for a policy implementation to represent the
> > > > >> existing
> > > > >> behavior suggests that the policy interface isn't quite right.
> Could
> > > the
> > > > >> policy's "validate" method take the overrides that were supplied
> and
> > > > >> return
> > > > >> the overrides that should be passed to the connector, yet still
> > > throwing
> > > > >> an
> > > > >> exception if any supplied overrides are not allowed. Then the
> > > different
> > > > >> policy implementations might be:
> > > > >>
> > > > >>- Ignore (default) - returns all supplied override properties
> > > > >>- None - throws exception if any override properties are
> > supplied;
> > > > >>always returns empty map if no overrides are provided
> > > > >>- Principal - throws exception if other override properties are
> > > > >>provided, but returns an 

[jira] [Created] (KAFKA-8327) Connection to Node2 was disconnected

2019-05-06 Thread suseendramani (JIRA)
suseendramani created KAFKA-8327:


 Summary: Connection to Node2 was disconnected 
 Key: KAFKA-8327
 URL: https://issues.apache.org/jira/browse/KAFKA-8327
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Affects Versions: 1.1.0
Reporter: suseendramani


Hi Team,

 

we are seeing the below errors in the kafka logs. We are using the kafka 
version 1.1.0,   COuld you please let us know if there are any fixes that can 
be applied to this ?

 

[2019-05-06 20:40:23,212] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Error sending fetch request (sessionId=896515694, epoch=738904) to 
node 2: java.io.IOException: Connection to 2 was disconnected before the 
response wa

s read. (org.apache.kafka.clients.FetchSessionHandler)

[2019-05-06 20:40:23,422] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData=\{xxx-12=(offset=38826573, logStartOffset=38549032, 
maxBytes=1048576), xx-14=(offset=49033, logStartOffset=0, 
maxBytes=1048576), rWithSubscription-Cas-3=(offset=40752457, 
logStartOffset=40198369, maxBytes=1048576), xx-8=(offset=39543295, 
logStartOffset=39032103, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, 
toForget=, metadata=(s

essionId=896515694, epoch=738904)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 2 was disconnected before the response was 
read

 

We are also seeing the below errors..

 

[2019-05-07 01:39:57,310] INFO [ReplicaFetcher replicaId=0, leaderId=4, 
fetcherId=0] Retrying leaderEpoch request for partition __consumer_offsets-31 
as the leader reported an error: NOT_LEADER_FOR_PARTITION 
(kafka.server.ReplicaFetcherThread)

 

Please let us know the reason for the same.

 

Thanks,

Mani

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-05-06 Thread Development
Hi John,

I’m really sorry for the confusion. I cloned that JIRA ticket from an old one 
about introducing UUID Serde, and I guess was too hasty while editing the copy 
to notice the mistake. Just edited the ticket. Sorry for any inconvenience .

As per comparator, I agree. Let’s make user be responsible for implementing 
comparable interface. I was just thinking to make the serde a little more 
flexible (i.e. let user decide in which order records is going to be inserted 
into a change log topic).

Thank you!

Best,
Daniyar Yeralin


> On May 6, 2019, at 5:37 PM, John Roesler  wrote:
> 
> Hi Daniyar,
> 
> Thanks for the proposal!
> 
> If I understand the point about the comparator, is it just to capture the
> generic type parameter? If so, then anything that implements a known
> interface would work just as well, right? I've been considering adding
> something like the Jackson TypeReference (or similar classes in many other
> projects). Would this be a good time to do it?
> 
> Note that it's not necessary to actually require that the captured type is
> Comparable (as this proposal currently does), it's just a way to make sure
> there is some method that makes use of the generic type parameter, to force
> the compiler to capture the type.
> 
> Just to make sure I understand the motivation... You expressed a desire to
> be able to serialize UUIDs, which I didn't follow, since there is a
> built-in UUID serde: org.apache.kafka.common.serialization.Serdes#UUID, and
> also, a UUID isn't a List. Did you mean that you need to use *lists of*
> UUIDs?
> 
> Thanks,
> -John
> 
> On Mon, May 6, 2019 at 11:49 AM Development  wrote:
> 
>> Hello,
>> 
>> Starting a discussion for KIP-466 adding support for List Serde. PR is
>> created under https://github.com/apache/kafka/pull/6592 <
>> https://github.com/apache/kafka/pull/6592>
>> 
>> There are two topics I would like to discuss:
>> 1. Since type for List serve needs to be declared before hand, I could not
>> create a static method for List Serde under
>> org.apache.kafka.common.serialization.Serdes. I addressed it in the KIP:
>> P.S. Static method corresponding to ListSerde under
>> org.apache.kafka.common.serialization.Serdes (something like static public
>> Serde> List() {...} inorg.apache.kafka.common.serialization.Serdes)
>> class cannot be added because type needs to be defined beforehand. That's
>> why one needs to create List Serde in the following fashion:
>> new Serdes.ListSerde(Serdes.String(),
>> Comparator.comparing(String::length));
>> (can possibly be simplified by declaring import static
>> org.apache.kafka.common.serialization.Serdes.ListSerde)
>> 
>> 2. @miguno Michael G. Noll  is questioning
>> whether I need to pass a comparator to ListDeserializer. This certainly is
>> not required. Feel free to add your input:
>> https://github.com/apache/kafka/pull/6592#discussion_r281152067
>> 
>> Thank you!
>> 
>> Best,
>> Daniyar Yeralin
>> 
>>> On May 6, 2019, at 11:59 AM, Daniyar Yeralin (JIRA) 
>> wrote:
>>> 
>>> Daniyar Yeralin created KAFKA-8326:
>>> --
>>> 
>>>Summary: Add List Serde
>>>Key: KAFKA-8326
>>>URL: https://issues.apache.org/jira/browse/KAFKA-8326
>>>Project: Kafka
>>> Issue Type: Improvement
>>> Components: clients, streams
>>>   Reporter: Daniyar Yeralin
>>> 
>>> 
>>> I propose adding serializers and deserializers for the java.util.List
>> class.
>>> 
>>> I have many use cases where I want to set the key of a Kafka message to
>> be a UUID. Currently, I need to turn UUIDs into strings or byte arrays and
>> use their associated Serdes, but it would be more convenient to serialize
>> and deserialize UUIDs directly.
>>> 
>>> I believe there are many use cases where one would want to have a List
>> serde. Ex. [
>> https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
>> [
>> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
>> ]
>>> 
>>> 
>>> 
>>> KIP Link: [
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization
>> ]
>>> 
>>> 
>>> 
>>> --
>>> This message was sent by Atlassian JIRA
>>> (v7.6.3#76005)
>> 
>> 



Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Randall Hauch
Thanks, Magesh. I do have a few pretty minor suggestions.

1) Define a bit more clearly in the "Proposed Changes" whether the configs
passed to the validate method via the ConnectorClientConfigRequest object
have or do not have the prefixes.
2) Specify more clearly in (or around) the table which is the default
policy. Currently the Ignore policy "Behavior" just mentions that it's the
current behavior, but I think it would help that it is described as the
default for the property.

Otherwise, this looks good to me.

Best regards,

Randall

On Mon, May 6, 2019 at 8:12 PM Magesh Nandakumar 
wrote:

> Konstantine,
>
> Thanks a lot for your feedback on the KIP. I have incorporated the feedback
> using generics for Class. I have also updated the KIP to handle the default
> value per Randall's suggestion. Let me know if you have any questions.
>
> Thanks,
> Magesh
>
>
> On Mon, May 6, 2019 at 1:58 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thanks for the KIP Magesh, it's quite useful towards the goals for more
> > general multi-tenancy in Connect.
> >
> > Couple of comments from me too:
> >
> > I think the fact that the default policy is 'null' (no implementation)
> > should be mentioned on the table next to the available implementations.
> > Currently the KIP says: 'In addition to the default implementation, ..."
> > but this is not very accurate because there is no concrete default
> > implementation. Just special handling of 'null' in
> > 'connector.client.config.policy'
> >
> > Regarding passing the overrides to the connector 'configure' method, I
> feel
> > it wouldn't hurt to pass them, but I also agree that leaving this out at
> > the moment is the safest option.
> >
> > Since the interfaces and classes are listed in the KIP, I'd like to note
> > that Class is used as a raw type in field and return value declarations.
> We
> > should use the generic type instead.
> >
> > Thanks for this improvement proposal!
> > Konstantine
> >
> > On Mon, May 6, 2019 at 11:11 AM Magesh Nandakumar 
> > wrote:
> >
> > > Randall,
> > >
> > > I was wondering if you had any thoughts on the above alternatives to
> deal
> > > with a default policy.  If it's possible, I would like to finalize the
> > > discussions and start a vote.
> > > Let me know your thoughts.
> > >
> > > Thanks,
> > > Magesh
> > >
> > > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Randall,
> > > >
> > > > The approach to return the to override configs could possibly make it
> > > > cumbersome to implement a custom policy. This is a new configuration
> > and
> > > if
> > > > you don't explicitly set it the existing behavior remains as-is. Like
> > > > Chris, I also preferred this approach for the sake of simplicity.  If
> > not
> > > > for the default `null` I would prefer to fall back to using `Ignore`
> > > which
> > > > is a misnomer to the interface spec but still gets the job done via
> > > > instanceOf checks. The other options I could think of are as below:-
> > > >
> > > >- have an enforcePolicy() method in the interface which by default
> > > >returns true and the Ignore implementation could return false
> > > >- introduce another worker config allow.connector.config.overrides
> > > >with a default value of false and the default policy can be None
> > > >
> > > > Let me know what you think.
> > > >
> > > > Thanks
> > > > Magesh
> > > >
> > > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> > wrote:
> > > >
> > > >> Thanks, Chris. I still think it's strange to have a non-policy,
> since
> > > >> there's now special behavior for when the policy is not specified.
> > > >>
> > > >> Perhaps the inability for a policy implementation to represent the
> > > >> existing
> > > >> behavior suggests that the policy interface isn't quite right. Could
> > the
> > > >> policy's "validate" method take the overrides that were supplied and
> > > >> return
> > > >> the overrides that should be passed to the connector, yet still
> > throwing
> > > >> an
> > > >> exception if any supplied overrides are not allowed. Then the
> > different
> > > >> policy implementations might be:
> > > >>
> > > >>- Ignore (default) - returns all supplied override properties
> > > >>- None - throws exception if any override properties are
> supplied;
> > > >>always returns empty map if no overrides are provided
> > > >>- Principal - throws exception if other override properties are
> > > >>provided, but returns an empty map (since no properties should be
> > > >> passed to
> > > >>the connector)
> > > >>- All - returns all provided override properties
> > > >>
> > > >> All override properties defined on the connector configuration would
> > be
> > > >> passed to the policy for validation, and assuming there's no error
> all
> > > of
> > > >> these overrides would be used in the producer/consumer/admin client.
> > The
> > > >> result of the policy call, however, 

Re: [VOTE] KIP-453: Add close() method to RocksDBConfigSetter

2019-05-06 Thread Kamal Chandraprakash
+1 (non-binding). Thanks for the KIP.

On Tue, May 7, 2019 at 3:50 AM Bill Bejeck  wrote:

> Thanks for the KIP Sophie.
>
> +1(binding)
>
> On Mon, May 6, 2019 at 4:51 PM John Roesler  wrote:
>
> > Thanks, Sophie, I reviewed the KIP, and I agree this is the best /
> > only-practical approach.
> >
> > +1 (non-binding)
> >
> > On Mon, May 6, 2019 at 2:23 PM Matthias J. Sax 
> > wrote:
> >
> > > +1 (binding)
> > >
> > >
> > >
> > > On 5/6/19 6:28 PM, Sophie Blee-Goldman wrote:
> > > > Hi all,
> > > >
> > > > I'd like to call for a vote on a minor KIP that adds a close() method
> > to
> > > > the RocksDBConfigSetter interface of Streams.
> > > >
> > > > Link:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter
> > > >
> > > > This is important for users who have created RocksOBjects in
> > > > RocksDBConfigSetter#setConfig to avoid leaking off-heap memory
> > > >
> > > > Thanks!
> > > > Sophie
> > > >
> > >
> > >
> >
>


Jenkins build is back to normal : kafka-trunk-jdk11 #490

2019-05-06 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Magesh Nandakumar
Hi Randall,

I have incorporated your comments and updated the KIP with the following


   1. update the config key to be `connector.client.config.override.policy`
   2. update the interface name to be
   `ConnectorClientConfigOverridePolicy`. Updated the implementation names
   also to match the new interface name
   3. introduce useOverrides() in the interface
   4. make `Ignore` the default implementation for the policy.


Let me know if you have any questions.

Thanks
Magesh

On Mon, May 6, 2019 at 5:45 PM Randall Hauch  wrote:

> That's fine. I don't think there's much difference between the two option
> anyway. I'm looking forward to the updated KIP.
>
> On Mon, May 6, 2019 at 7:38 PM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > Thanks a lot for your feedback.
> >
> > If I understand it correctly, we could do one of the following, right?
> >
> > 1. introduce a new config `allow.client.config.overrides` with a default
> > value of false. The default value for the policy would be `None`. So by
> > default, we will still preserve the current behavior.
> > 2. introduce `useOverride()` default method that returns true in the
> > interface. The `Ignore` policy would then override it to return false.
> > `Ignore` will also be the default policy.
> >
> > I personally prefer option #2, since it involves one less configuration
> but
> > then I'm also open to the other option.
> >
> > Thanks,
> > Magesh
> >
> > On Mon, May 6, 2019 at 5:29 PM Randall Hauch  wrote:
> >
> > > I actually like a separate config for whether to pass or filter client
> > > override properties to the connector. I generally dislike adding more
> > > properties, but in this case it keeps the two orthogonal behaviors
> > > independent and reduces the need to implement policies that cover all
> > > permutations.
> > >
> > > However, I still find it strange to have a "non-policy" be the default.
> > So
> > > either of these modifications to the current KIP would be fine with me:
> > > 1) Add a `useOverride()` default method that returns true, but which
> the
> > > None policy could override and return false; and keep the
> `validate(...)`
> > > method as it is.
> > > 2) Change the `validate(Map<...>) method to support a filtering
> pattern,
> > > such as `Map<...> filterOverrides(Map<...> connectorClientOverrides)`
> > >
> > > The point is that the default is the name of a built-in policy.
> > >
> > > Also, one minor suggestion is to use the term "override" in the config
> > > property (e.g., `connector.client.override.policy`) since that term is
> > used
> > > prevalently and matches the `producer.override`, `consumer.override`,
> and
> > > `admin.override` prefixes.
> > >
> > > Thanks for working through this, Magesh.
> > >
> > > Randall
> > >
> > > On Mon, May 6, 2019 at 1:11 PM Magesh Nandakumar  >
> > > wrote:
> > >
> > > > Randall,
> > > >
> > > > I was wondering if you had any thoughts on the above alternatives to
> > deal
> > > > with a default policy.  If it's possible, I would like to finalize
> the
> > > > discussions and start a vote.
> > > > Let me know your thoughts.
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar <
> > mage...@confluent.io>
> > > > wrote:
> > > >
> > > > > Randall,
> > > > >
> > > > > The approach to return the to override configs could possibly make
> it
> > > > > cumbersome to implement a custom policy. This is a new
> configuration
> > > and
> > > > if
> > > > > you don't explicitly set it the existing behavior remains as-is.
> Like
> > > > > Chris, I also preferred this approach for the sake of simplicity.
> If
> > > not
> > > > > for the default `null` I would prefer to fall back to using
> `Ignore`
> > > > which
> > > > > is a misnomer to the interface spec but still gets the job done via
> > > > > instanceOf checks. The other options I could think of are as
> below:-
> > > > >
> > > > >- have an enforcePolicy() method in the interface which by
> default
> > > > >returns true and the Ignore implementation could return false
> > > > >- introduce another worker config
> allow.connector.config.overrides
> > > > >with a default value of false and the default policy can be None
> > > > >
> > > > > Let me know what you think.
> > > > >
> > > > > Thanks
> > > > > Magesh
> > > > >
> > > > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> > > wrote:
> > > > >
> > > > >> Thanks, Chris. I still think it's strange to have a non-policy,
> > since
> > > > >> there's now special behavior for when the policy is not specified.
> > > > >>
> > > > >> Perhaps the inability for a policy implementation to represent the
> > > > >> existing
> > > > >> behavior suggests that the policy interface isn't quite right.
> Could
> > > the
> > > > >> policy's "validate" method take the overrides that were supplied
> and
> > > > >> return
> > > > >> the overrides that should be passed to the connector, yet still
> > > throwing
> > > > >> an
> > > > >> 

Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Magesh Nandakumar
Konstantine,

Thanks a lot for your feedback on the KIP. I have incorporated the feedback
using generics for Class. I have also updated the KIP to handle the default
value per Randall's suggestion. Let me know if you have any questions.

Thanks,
Magesh


On Mon, May 6, 2019 at 1:58 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks for the KIP Magesh, it's quite useful towards the goals for more
> general multi-tenancy in Connect.
>
> Couple of comments from me too:
>
> I think the fact that the default policy is 'null' (no implementation)
> should be mentioned on the table next to the available implementations.
> Currently the KIP says: 'In addition to the default implementation, ..."
> but this is not very accurate because there is no concrete default
> implementation. Just special handling of 'null' in
> 'connector.client.config.policy'
>
> Regarding passing the overrides to the connector 'configure' method, I feel
> it wouldn't hurt to pass them, but I also agree that leaving this out at
> the moment is the safest option.
>
> Since the interfaces and classes are listed in the KIP, I'd like to note
> that Class is used as a raw type in field and return value declarations. We
> should use the generic type instead.
>
> Thanks for this improvement proposal!
> Konstantine
>
> On Mon, May 6, 2019 at 11:11 AM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > I was wondering if you had any thoughts on the above alternatives to deal
> > with a default policy.  If it's possible, I would like to finalize the
> > discussions and start a vote.
> > Let me know your thoughts.
> >
> > Thanks,
> > Magesh
> >
> > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar 
> > wrote:
> >
> > > Randall,
> > >
> > > The approach to return the to override configs could possibly make it
> > > cumbersome to implement a custom policy. This is a new configuration
> and
> > if
> > > you don't explicitly set it the existing behavior remains as-is. Like
> > > Chris, I also preferred this approach for the sake of simplicity.  If
> not
> > > for the default `null` I would prefer to fall back to using `Ignore`
> > which
> > > is a misnomer to the interface spec but still gets the job done via
> > > instanceOf checks. The other options I could think of are as below:-
> > >
> > >- have an enforcePolicy() method in the interface which by default
> > >returns true and the Ignore implementation could return false
> > >- introduce another worker config allow.connector.config.overrides
> > >with a default value of false and the default policy can be None
> > >
> > > Let me know what you think.
> > >
> > > Thanks
> > > Magesh
> > >
> > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> wrote:
> > >
> > >> Thanks, Chris. I still think it's strange to have a non-policy, since
> > >> there's now special behavior for when the policy is not specified.
> > >>
> > >> Perhaps the inability for a policy implementation to represent the
> > >> existing
> > >> behavior suggests that the policy interface isn't quite right. Could
> the
> > >> policy's "validate" method take the overrides that were supplied and
> > >> return
> > >> the overrides that should be passed to the connector, yet still
> throwing
> > >> an
> > >> exception if any supplied overrides are not allowed. Then the
> different
> > >> policy implementations might be:
> > >>
> > >>- Ignore (default) - returns all supplied override properties
> > >>- None - throws exception if any override properties are supplied;
> > >>always returns empty map if no overrides are provided
> > >>- Principal - throws exception if other override properties are
> > >>provided, but returns an empty map (since no properties should be
> > >> passed to
> > >>the connector)
> > >>- All - returns all provided override properties
> > >>
> > >> All override properties defined on the connector configuration would
> be
> > >> passed to the policy for validation, and assuming there's no error all
> > of
> > >> these overrides would be used in the producer/consumer/admin client.
> The
> > >> result of the policy call, however, is used to determine which of
> these
> > >> overrides are passed to the connector.
> > >>
> > >> This approach means that all behaviors can be implemented through a
> > policy
> > >> class, including the defaults. It also gives a bit more control to
> > custom
> > >> policies, should that be warranted. For example, validating the
> provided
> > >> client overrides but passing all such override properties to the
> > >> connector,
> > >> which as I stated earlier is something I think connectors likely don't
> > >> look
> > >> for.
> > >>
> > >> Thoughts?
> > >>
> > >> Randall
> > >>
> > >> On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton 
> > >> wrote:
> > >>
> > >> > Randall,
> > >> >
> > >> > The special behavior for null was my suggestion. There is no
> > >> implementation
> > >> > of the proposed interface that causes client overrides to be
> 

Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Randall Hauch
That's fine. I don't think there's much difference between the two option
anyway. I'm looking forward to the updated KIP.

On Mon, May 6, 2019 at 7:38 PM Magesh Nandakumar 
wrote:

> Randall,
>
> Thanks a lot for your feedback.
>
> If I understand it correctly, we could do one of the following, right?
>
> 1. introduce a new config `allow.client.config.overrides` with a default
> value of false. The default value for the policy would be `None`. So by
> default, we will still preserve the current behavior.
> 2. introduce `useOverride()` default method that returns true in the
> interface. The `Ignore` policy would then override it to return false.
> `Ignore` will also be the default policy.
>
> I personally prefer option #2, since it involves one less configuration but
> then I'm also open to the other option.
>
> Thanks,
> Magesh
>
> On Mon, May 6, 2019 at 5:29 PM Randall Hauch  wrote:
>
> > I actually like a separate config for whether to pass or filter client
> > override properties to the connector. I generally dislike adding more
> > properties, but in this case it keeps the two orthogonal behaviors
> > independent and reduces the need to implement policies that cover all
> > permutations.
> >
> > However, I still find it strange to have a "non-policy" be the default.
> So
> > either of these modifications to the current KIP would be fine with me:
> > 1) Add a `useOverride()` default method that returns true, but which the
> > None policy could override and return false; and keep the `validate(...)`
> > method as it is.
> > 2) Change the `validate(Map<...>) method to support a filtering pattern,
> > such as `Map<...> filterOverrides(Map<...> connectorClientOverrides)`
> >
> > The point is that the default is the name of a built-in policy.
> >
> > Also, one minor suggestion is to use the term "override" in the config
> > property (e.g., `connector.client.override.policy`) since that term is
> used
> > prevalently and matches the `producer.override`, `consumer.override`, and
> > `admin.override` prefixes.
> >
> > Thanks for working through this, Magesh.
> >
> > Randall
> >
> > On Mon, May 6, 2019 at 1:11 PM Magesh Nandakumar 
> > wrote:
> >
> > > Randall,
> > >
> > > I was wondering if you had any thoughts on the above alternatives to
> deal
> > > with a default policy.  If it's possible, I would like to finalize the
> > > discussions and start a vote.
> > > Let me know your thoughts.
> > >
> > > Thanks,
> > > Magesh
> > >
> > > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Randall,
> > > >
> > > > The approach to return the to override configs could possibly make it
> > > > cumbersome to implement a custom policy. This is a new configuration
> > and
> > > if
> > > > you don't explicitly set it the existing behavior remains as-is. Like
> > > > Chris, I also preferred this approach for the sake of simplicity.  If
> > not
> > > > for the default `null` I would prefer to fall back to using `Ignore`
> > > which
> > > > is a misnomer to the interface spec but still gets the job done via
> > > > instanceOf checks. The other options I could think of are as below:-
> > > >
> > > >- have an enforcePolicy() method in the interface which by default
> > > >returns true and the Ignore implementation could return false
> > > >- introduce another worker config allow.connector.config.overrides
> > > >with a default value of false and the default policy can be None
> > > >
> > > > Let me know what you think.
> > > >
> > > > Thanks
> > > > Magesh
> > > >
> > > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> > wrote:
> > > >
> > > >> Thanks, Chris. I still think it's strange to have a non-policy,
> since
> > > >> there's now special behavior for when the policy is not specified.
> > > >>
> > > >> Perhaps the inability for a policy implementation to represent the
> > > >> existing
> > > >> behavior suggests that the policy interface isn't quite right. Could
> > the
> > > >> policy's "validate" method take the overrides that were supplied and
> > > >> return
> > > >> the overrides that should be passed to the connector, yet still
> > throwing
> > > >> an
> > > >> exception if any supplied overrides are not allowed. Then the
> > different
> > > >> policy implementations might be:
> > > >>
> > > >>- Ignore (default) - returns all supplied override properties
> > > >>- None - throws exception if any override properties are
> supplied;
> > > >>always returns empty map if no overrides are provided
> > > >>- Principal - throws exception if other override properties are
> > > >>provided, but returns an empty map (since no properties should be
> > > >> passed to
> > > >>the connector)
> > > >>- All - returns all provided override properties
> > > >>
> > > >> All override properties defined on the connector configuration would
> > be
> > > >> passed to the policy for validation, and assuming there's no error
> all
> > > of
> > > >> 

Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Magesh Nandakumar
Randall,

Thanks a lot for your feedback.

If I understand it correctly, we could do one of the following, right?

1. introduce a new config `allow.client.config.overrides` with a default
value of false. The default value for the policy would be `None`. So by
default, we will still preserve the current behavior.
2. introduce `useOverride()` default method that returns true in the
interface. The `Ignore` policy would then override it to return false.
`Ignore` will also be the default policy.

I personally prefer option #2, since it involves one less configuration but
then I'm also open to the other option.

Thanks,
Magesh

On Mon, May 6, 2019 at 5:29 PM Randall Hauch  wrote:

> I actually like a separate config for whether to pass or filter client
> override properties to the connector. I generally dislike adding more
> properties, but in this case it keeps the two orthogonal behaviors
> independent and reduces the need to implement policies that cover all
> permutations.
>
> However, I still find it strange to have a "non-policy" be the default. So
> either of these modifications to the current KIP would be fine with me:
> 1) Add a `useOverride()` default method that returns true, but which the
> None policy could override and return false; and keep the `validate(...)`
> method as it is.
> 2) Change the `validate(Map<...>) method to support a filtering pattern,
> such as `Map<...> filterOverrides(Map<...> connectorClientOverrides)`
>
> The point is that the default is the name of a built-in policy.
>
> Also, one minor suggestion is to use the term "override" in the config
> property (e.g., `connector.client.override.policy`) since that term is used
> prevalently and matches the `producer.override`, `consumer.override`, and
> `admin.override` prefixes.
>
> Thanks for working through this, Magesh.
>
> Randall
>
> On Mon, May 6, 2019 at 1:11 PM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > I was wondering if you had any thoughts on the above alternatives to deal
> > with a default policy.  If it's possible, I would like to finalize the
> > discussions and start a vote.
> > Let me know your thoughts.
> >
> > Thanks,
> > Magesh
> >
> > On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar 
> > wrote:
> >
> > > Randall,
> > >
> > > The approach to return the to override configs could possibly make it
> > > cumbersome to implement a custom policy. This is a new configuration
> and
> > if
> > > you don't explicitly set it the existing behavior remains as-is. Like
> > > Chris, I also preferred this approach for the sake of simplicity.  If
> not
> > > for the default `null` I would prefer to fall back to using `Ignore`
> > which
> > > is a misnomer to the interface spec but still gets the job done via
> > > instanceOf checks. The other options I could think of are as below:-
> > >
> > >- have an enforcePolicy() method in the interface which by default
> > >returns true and the Ignore implementation could return false
> > >- introduce another worker config allow.connector.config.overrides
> > >with a default value of false and the default policy can be None
> > >
> > > Let me know what you think.
> > >
> > > Thanks
> > > Magesh
> > >
> > > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch 
> wrote:
> > >
> > >> Thanks, Chris. I still think it's strange to have a non-policy, since
> > >> there's now special behavior for when the policy is not specified.
> > >>
> > >> Perhaps the inability for a policy implementation to represent the
> > >> existing
> > >> behavior suggests that the policy interface isn't quite right. Could
> the
> > >> policy's "validate" method take the overrides that were supplied and
> > >> return
> > >> the overrides that should be passed to the connector, yet still
> throwing
> > >> an
> > >> exception if any supplied overrides are not allowed. Then the
> different
> > >> policy implementations might be:
> > >>
> > >>- Ignore (default) - returns all supplied override properties
> > >>- None - throws exception if any override properties are supplied;
> > >>always returns empty map if no overrides are provided
> > >>- Principal - throws exception if other override properties are
> > >>provided, but returns an empty map (since no properties should be
> > >> passed to
> > >>the connector)
> > >>- All - returns all provided override properties
> > >>
> > >> All override properties defined on the connector configuration would
> be
> > >> passed to the policy for validation, and assuming there's no error all
> > of
> > >> these overrides would be used in the producer/consumer/admin client.
> The
> > >> result of the policy call, however, is used to determine which of
> these
> > >> overrides are passed to the connector.
> > >>
> > >> This approach means that all behaviors can be implemented through a
> > policy
> > >> class, including the defaults. It also gives a bit more control to
> > custom
> > >> policies, should that be warranted. For example, validating 

Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Randall Hauch
I actually like a separate config for whether to pass or filter client
override properties to the connector. I generally dislike adding more
properties, but in this case it keeps the two orthogonal behaviors
independent and reduces the need to implement policies that cover all
permutations.

However, I still find it strange to have a "non-policy" be the default. So
either of these modifications to the current KIP would be fine with me:
1) Add a `useOverride()` default method that returns true, but which the
None policy could override and return false; and keep the `validate(...)`
method as it is.
2) Change the `validate(Map<...>) method to support a filtering pattern,
such as `Map<...> filterOverrides(Map<...> connectorClientOverrides)`

The point is that the default is the name of a built-in policy.

Also, one minor suggestion is to use the term "override" in the config
property (e.g., `connector.client.override.policy`) since that term is used
prevalently and matches the `producer.override`, `consumer.override`, and
`admin.override` prefixes.

Thanks for working through this, Magesh.

Randall

On Mon, May 6, 2019 at 1:11 PM Magesh Nandakumar 
wrote:

> Randall,
>
> I was wondering if you had any thoughts on the above alternatives to deal
> with a default policy.  If it's possible, I would like to finalize the
> discussions and start a vote.
> Let me know your thoughts.
>
> Thanks,
> Magesh
>
> On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > The approach to return the to override configs could possibly make it
> > cumbersome to implement a custom policy. This is a new configuration and
> if
> > you don't explicitly set it the existing behavior remains as-is. Like
> > Chris, I also preferred this approach for the sake of simplicity.  If not
> > for the default `null` I would prefer to fall back to using `Ignore`
> which
> > is a misnomer to the interface spec but still gets the job done via
> > instanceOf checks. The other options I could think of are as below:-
> >
> >- have an enforcePolicy() method in the interface which by default
> >returns true and the Ignore implementation could return false
> >- introduce another worker config allow.connector.config.overrides
> >with a default value of false and the default policy can be None
> >
> > Let me know what you think.
> >
> > Thanks
> > Magesh
> >
> > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch  wrote:
> >
> >> Thanks, Chris. I still think it's strange to have a non-policy, since
> >> there's now special behavior for when the policy is not specified.
> >>
> >> Perhaps the inability for a policy implementation to represent the
> >> existing
> >> behavior suggests that the policy interface isn't quite right. Could the
> >> policy's "validate" method take the overrides that were supplied and
> >> return
> >> the overrides that should be passed to the connector, yet still throwing
> >> an
> >> exception if any supplied overrides are not allowed. Then the different
> >> policy implementations might be:
> >>
> >>- Ignore (default) - returns all supplied override properties
> >>- None - throws exception if any override properties are supplied;
> >>always returns empty map if no overrides are provided
> >>- Principal - throws exception if other override properties are
> >>provided, but returns an empty map (since no properties should be
> >> passed to
> >>the connector)
> >>- All - returns all provided override properties
> >>
> >> All override properties defined on the connector configuration would be
> >> passed to the policy for validation, and assuming there's no error all
> of
> >> these overrides would be used in the producer/consumer/admin client. The
> >> result of the policy call, however, is used to determine which of these
> >> overrides are passed to the connector.
> >>
> >> This approach means that all behaviors can be implemented through a
> policy
> >> class, including the defaults. It also gives a bit more control to
> custom
> >> policies, should that be warranted. For example, validating the provided
> >> client overrides but passing all such override properties to the
> >> connector,
> >> which as I stated earlier is something I think connectors likely don't
> >> look
> >> for.
> >>
> >> Thoughts?
> >>
> >> Randall
> >>
> >> On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton 
> >> wrote:
> >>
> >> > Randall,
> >> >
> >> > The special behavior for null was my suggestion. There is no
> >> implementation
> >> > of the proposed interface that causes client overrides to be ignored,
> so
> >> > the original idea was to have a special implementation that would be
> >> > checked for by the Connect framework (probably via the instanceof
> >> operator)
> >> > and, if present, cause all would-be overrides to be ignored.
> >> >
> >> > I thought this may be confusing to people who may see that behavior
> and
> >> > wonder how to recreate it themselves, so I suggested leaving that
> 

Re: [VOTE] KIP-460: Admin Leader Election RPC

2019-05-06 Thread Gwen Shapira
All committer votes are binding, so you have binding +1 from Colin, Jason
and myself - which is just the 3 you need for the KIP to be accepted.
Mickael added non-binding community support, which is great signal as well.

On Mon, May 6, 2019 at 4:39 PM Jose Armando Garcia Sancio <
jsan...@confluent.io> wrote:

> I am closing the voting. KIP is accepted with:
>
> +1 (binding): Colin McCabe
> +1 (non-binding): Jason Gustafson, Gwen Shapira, Mickael Maison
>
> Thanks!
>
> On Fri, May 3, 2019 at 9:59 AM Mickael Maison 
> wrote:
>
> > +1 (non binding)
> > Thanks for the KIP
> >
> > On Thu, May 2, 2019 at 11:02 PM Colin McCabe  wrote:
> > >
> > > +1 (binding)
> > >
> > > thanks, Jose.
> > >
> > > best,
> > > Colin
> > >
> > > On Wed, May 1, 2019, at 14:44, Jose Armando Garcia Sancio wrote:
> > > > Hi all,
> > > >
> > > > I would like to start the voting for KIP-460:
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC
> > > >
> > > > The thread discussion is here:
> > > > https://www.mail-archive.com/dev@kafka.apache.org/msg97226.html
> > > >
> > > > Thanks!
> > > > -Jose
> > > >
> >
>
>
> --
> -Jose
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: [VOTE] KIP-460: Admin Leader Election RPC

2019-05-06 Thread Jose Armando Garcia Sancio
I am closing the voting. KIP is accepted with:

+1 (binding): Colin McCabe
+1 (non-binding): Jason Gustafson, Gwen Shapira, Mickael Maison

Thanks!

On Fri, May 3, 2019 at 9:59 AM Mickael Maison 
wrote:

> +1 (non binding)
> Thanks for the KIP
>
> On Thu, May 2, 2019 at 11:02 PM Colin McCabe  wrote:
> >
> > +1 (binding)
> >
> > thanks, Jose.
> >
> > best,
> > Colin
> >
> > On Wed, May 1, 2019, at 14:44, Jose Armando Garcia Sancio wrote:
> > > Hi all,
> > >
> > > I would like to start the voting for KIP-460:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC
> > >
> > > The thread discussion is here:
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg97226.html
> > >
> > > Thanks!
> > > -Jose
> > >
>


-- 
-Jose


Re: [VOTE] KIP-258: Allow to Store Record Timestamps in RocksDB

2019-05-06 Thread Matthias J. Sax
Another small change to the KIP:

We want to add two more public static helper methods `make()` and
`getValueOrNull` to simplify `null` handling. Also, the constructor of
`ValueAndTimestamp` should be private.


> public class ValueAndTimestamp {
> private ValueAndTimestamp(final V value, final long timestamp); // use 
> `make()` instead> public V value();
> public long timestamp();
> public static  ValueAndTimestamp make(final V value, final long 
> timestamp); // returns `null` if `value==null`
> public static  V getValueOrNull(final ValueAndTimestamp 
> valueAndTimestamp); // returns `null` if `valueAndTimestamp==null`
> }

I don't think there is a need to revote. I updated the KIP accordingly.
Please let me know if there are any concern about this minor change.


-Matthias


On 4/19/19 1:27 AM, Matthias J. Sax wrote:
> Quick update to the KIP. While working on
> 
> https://github.com/apache/kafka/pull/6601
> 
> I realized that I forgot to list the following three new
> methods that we need to add in addition:
> 
>> public static KeyValueBytesStoreSupplier 
>> persistentTimestampedKeyValueStore(final String name);
>>
>> public static WindowBytesStoreSupplier 
>> persistentTimestampedWindowStore(final String name,
>> 
>> final Duration retentionPeriod,
>> 
>> final Duration windowSize,
>> 
>> final boolean retainDuplicates);
>>
>> public static SessionBytesStoreSupplier 
>> persistentTimestampedSessionStore(final String name,
>>  
>>  final Duration retentionPeriod);
> 
> I updated the KIP accordingly.
> 
> 
> I don't think there is any need to revote, because this is a minor and
> straight forward change to the KIP.
> 
> 
> -Matthias
> 
> 
> On 1/28/19 6:32 PM, Matthias J. Sax wrote:
>> Hi,
>>
>> during PR reviews, we discovered a couple of opportunities to simply and
>> improve the KIP and code. Thus, the following minor changes to the
>> public API are done (the KIP is already updated). I revote is not
>> necessary as the changes are minor.
>>
>>  - interface `ValueAndTimestamp` is going to be a class
>>
>>  - interface `RecordConverter` is renamed to `TimestampedBytesStore` and
>> we add a static method that converts values from old to new format
>>
>>  - the three new interfaces `TimestampedXxxStore` don't add any new methods
>>
>>
>>
>> Let us know if there are any objections. I can also provide more details
>> why those changes make sense.
>>
>> Thanks a lot!
>>
>>
>> -Matthias
>>
>>
>> On 1/18/19 10:00 PM, Matthias J. Sax wrote:
>>> +1 from myself.
>>>
>>>
>>> I am also closing this vote. The KIP is accepted with
>>>
>>> - 3 binding votes (Damian, Guozhang, Matthias)
>>> - 3 non-binding votes (Bill, Patrik, John)
>>>
>>>
>>> Thanks for the discussion and voting.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 1/16/19 10:35 AM, John Roesler wrote:
 +1 (nonbinding) from me.

 Thanks for the KIP, Matthias.

 -John

 On Wed, Jan 16, 2019 at 12:01 PM Guozhang Wang  wrote:

> Thanks Matthias, I left some minor comments but since they do not involve
> in any major architectural changes and I did not feel strong about the
> naming etc as well. I'd +1 on the proposal as well.
>
> Feel free to reply / accept or reject my suggestions on the other DISCUSS
> thread.
>
>
> Guozhang
>
> On Wed, Jan 16, 2019 at 6:38 AM Damian Guy  wrote:
>
>> +1
>>
>> On Wed, 16 Jan 2019 at 05:09, Patrik Kleindl  wrote:
>>
>>> +1 (non-binding)
>>> Thanks too
>>> Best regards
>>> Patrik
>>>
 Am 16.01.2019 um 03:30 schrieb Bill Bejeck :

 Thanks for the KIP Matthias.

 +1

 -Bill

 On Tue, Jan 15, 2019 at 7:33 PM Matthias J. Sax <
> matth...@confluent.io
>>>
 wrote:

> Hi,
>
> I would like to start the vote for KIP-258:
>
>
>
>>>
>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>
> The KIP adds new stores that allow to store record timestamps next
> to
> key and value. Additionally, we will allow to upgrade exiting stores
>> to
> the new stores; this will allow us to use the new stores in the DSL
>> with
> a smooth upgrade path.
>
>
> -Matthias
>
>
>>>
>>
>
>
> --
> -- Guozhang
>

>>>
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-453: Add close() method to RocksDBConfigSetter

2019-05-06 Thread Bill Bejeck
Thanks for the KIP Sophie.

+1(binding)

On Mon, May 6, 2019 at 4:51 PM John Roesler  wrote:

> Thanks, Sophie, I reviewed the KIP, and I agree this is the best /
> only-practical approach.
>
> +1 (non-binding)
>
> On Mon, May 6, 2019 at 2:23 PM Matthias J. Sax 
> wrote:
>
> > +1 (binding)
> >
> >
> >
> > On 5/6/19 6:28 PM, Sophie Blee-Goldman wrote:
> > > Hi all,
> > >
> > > I'd like to call for a vote on a minor KIP that adds a close() method
> to
> > > the RocksDBConfigSetter interface of Streams.
> > >
> > > Link:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter
> > >
> > > This is important for users who have created RocksOBjects in
> > > RocksDBConfigSetter#setConfig to avoid leaking off-heap memory
> > >
> > > Thanks!
> > > Sophie
> > >
> >
> >
>


[jira] [Resolved] (KAFKA-8255) Replica fetcher thread exits with OffsetOutOfRangeException

2019-05-06 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8255.

Resolution: Resolved

I'm going to resolve this. We believe the root cause was KAFKA-8306, which has 
been fixed. We can reopen if we find information indicating otherwise.

> Replica fetcher thread exits with OffsetOutOfRangeException
> ---
>
> Key: KAFKA-8255
> URL: https://issues.apache.org/jira/browse/KAFKA-8255
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Colin P. McCabe
>Priority: Major
>
> Replica fetcher threads can exits with OffsetOutOfRangeException when the log 
> start offset has advanced beyond the high water mark on the fetching broker.
> Example stack trace:
> {code}
> org.apache.kafka.common.KafkaException: Error processing data for partition 
> __consumer_offsets-46 offset 18761
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:335)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3$$anonfun$apply$10.apply(AbstractFetcherThread.scala:294)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:294)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2$$anonfun$apply$mcV$sp$3.apply(AbstractFetcherThread.scala:293)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:293)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$kafka$server$AbstractFetcherThread$$processFetchRequest$2.apply(AbstractFetcherThread.scala:293)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:292)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:132)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:131)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 4808819 of partition __consumer_offsets-46 
> since it is larger than the high watermark 18761
> [2019-04-16 14:16:42,257] INFO [ReplicaFetcher replicaId=1001, leaderId=1003, 
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {code}
> It seems that we should not terminate the replica fetcher thread in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Error connecting to kafka machine

2019-05-06 Thread Naveen V
Hello Team,

I have installed Kafka on  a centos machine and trying to connect to that
machine to send data to my broker and getting an error message as follows

"Message: pyembed:
ConnectionError(: Failed to establish a new connection: [Errno 111]
Connection refused) caused by:
NewConnectionError(: Failed to establish a new connection: [Errno 111]
Connection refused)  "

Am I missing anything? please do help, Thanks in advance!

Regards,
Naveen Velumani.


Re: Cleaning up command line tools argument parsing a little

2019-05-06 Thread Colin McCabe
On Mon, May 6, 2019, at 10:21, Sönke Liebau wrote:
> Hi Colin,
> 
> it was my intention to keep the structure of the commands mostly intact
> while doing the refactoring - if that is possible, have not really checked
> yet to be honest.
> 
> But what I wanted to try and do is recreate the current parsing with
> argparse as much as possible. And in the process simply adding synonyms,
> for example make the kafka-console-producer understand a
> bootstrap-parameter in addition to broker-list.
> There is a bit of custom logic about which parameters go together etc. in
> the current classes, so output may look different here and there, but in
> principle I do believe that it should be possible to recreate the current
> structure.

Sounds like a good idea.  Thanks for the clarification.

> 
> If there is an appetite for a new, hadoop-like entrypoint anyway, then all
> of this might be "wasted" effort, or rather effort better spent though, you
> are right.

I don't think anyone is working on a new entry point right now -- or if they 
are, they haven't said anything yet :)

I just wanted to mention it as a possible approach in case you wanted to do a 
bigger project.

best,
Colin

> 
> Best regards,
> Sönke
> 
> 
> 
> On Mon, May 6, 2019 at 7:13 PM Colin McCabe  wrote:
> 
> > Hi Sönke,
> >
> > #2 is a bit tough because people have come to rely on the way the commands
> > are structured right now.
> >
> > If we want to make big changes, it might be easier just to create a
> > separate tool and deprecate the old one(s).  One thing we've talked about
> > doing in the past is creating a single entry point for all the tool
> > functionality, kind of like hadoop did with the "hadoop" command  Or git
> > with the "git" command, etc.  Then we could deprecate the standalone
> > commands and remove them after enough time had passed-- kind of like the
> > old consumer.
> >
> > On the other hand, a more incremental change would be standardizing flags
> > a bit.  So for example, at least setting it up so that there is a standard
> > way of supplying bootstrap brokers, etc.  We could keep the old flags
> > around for a while as variants to ease the transition.
> >
> > best,
> > Colin
> >
> >
> > On Sun, May 5, 2019, at 00:54, Sönke Liebau wrote:
> > > Hi Colin,
> > >
> > > I totally agree! Especially the differently named bootstrap server
> > options
> > > have been annoying me a long time.
> > >
> > > I'd propose a two-step approach:
> > > 1. Add new default options objects similar to CommandLineUtils and
> > > CommandDefaultOptions (based on argparse4j) but in the clients project,
> > as
> > > this is referenced by all command line tools as far as I can tell
> > > 2. Refactor tools one by one to use these new helper classes (and thus
> > > argparse) and add standardized synonyms for parameters as necessary
> > >
> > > I think for step 1 we can get away with no KIP, as this doesn't change
> > any
> > > public interfaces or behavior.
> > > Step 2 probably needs a KIP as we are adding new parameters? We can pick
> > up
> > > KIP-14 again for that I think. A lot of work has been done on that
> > already.
> > >
> > > Does that sound useful to everybody?
> > >
> > > Best regards,
> > > Sönke
> > >
> > >
> > > On Thu, Apr 18, 2019 at 1:44 AM Colin McCabe  wrote:
> > >
> > > > If we are going to standardize on one argument parsing library, it
> > should
> > > > certainly be argparse4j, I think.
> > > >  argparse4j is simply a better argument parsing library with support
> > for
> > > > more features.  One example is mutually exclusive options.  argparse4j
> > > > supports this with MutuallyExclusiveGroup.  jopt doesn't support this,
> > so
> > > > when it is needed, we have to add extra code to manually check that
> > > > mutually exclusive options are not set.
> > > >
> > > > argparse4j also has subcommands.  If you want something like "git add"
> > > > with some set of flags, and "git remove" with another, you can do this
> > with
> > > > argparse4j, but not with jopt.  This would be very helpful for
> > clearing up
> > > > confusion in a lot of our shell scripts which have accumulated dozens
> > of
> > > > arguments, most of which are only relevant to a very specific
> > operation.
> > > > But you just can't do it with jopt.
> > > >
> > > > Just to give an example, argparse4j with subcommands would allow you to
> > > > run something like ./kafka-topics.sh list --help and get just options
> > that
> > > > were relevant for listing topics, not the full dozens of options that
> > might
> > > > relate to adding topics, removing them, etc.
> > > >
> > > > To be honest, though, what would help users the most is standardizing
> > the
> > > > option flags across tools.  We should have a standard way of specifying
> > > > bootstrap brokers, for example.  (We can continue to support the old
> > > > synonyms for a while, of course.)
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Wed, Apr 17, 2019, at 08:56, Guozhang Wang 

Not Able to connect to kafka

2019-05-06 Thread Naveen V
Hello Team,

I have installed kafka on  a centos machine and trying to connect to that
machine to send data to my broker and getting a error message as


Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-05-06 Thread John Roesler
Hi Daniyar,

Thanks for the proposal!

If I understand the point about the comparator, is it just to capture the
generic type parameter? If so, then anything that implements a known
interface would work just as well, right? I've been considering adding
something like the Jackson TypeReference (or similar classes in many other
projects). Would this be a good time to do it?

Note that it's not necessary to actually require that the captured type is
Comparable (as this proposal currently does), it's just a way to make sure
there is some method that makes use of the generic type parameter, to force
the compiler to capture the type.

Just to make sure I understand the motivation... You expressed a desire to
be able to serialize UUIDs, which I didn't follow, since there is a
built-in UUID serde: org.apache.kafka.common.serialization.Serdes#UUID, and
also, a UUID isn't a List. Did you mean that you need to use *lists of*
UUIDs?

Thanks,
-John

On Mon, May 6, 2019 at 11:49 AM Development  wrote:

> Hello,
>
> Starting a discussion for KIP-466 adding support for List Serde. PR is
> created under https://github.com/apache/kafka/pull/6592 <
> https://github.com/apache/kafka/pull/6592>
>
> There are two topics I would like to discuss:
> 1. Since type for List serve needs to be declared before hand, I could not
> create a static method for List Serde under
> org.apache.kafka.common.serialization.Serdes. I addressed it in the KIP:
> P.S. Static method corresponding to ListSerde under
> org.apache.kafka.common.serialization.Serdes (something like static public
> Serde> List() {...} inorg.apache.kafka.common.serialization.Serdes)
> class cannot be added because type needs to be defined beforehand. That's
> why one needs to create List Serde in the following fashion:
> new Serdes.ListSerde(Serdes.String(),
> Comparator.comparing(String::length));
> (can possibly be simplified by declaring import static
> org.apache.kafka.common.serialization.Serdes.ListSerde)
>
> 2. @miguno Michael G. Noll  is questioning
> whether I need to pass a comparator to ListDeserializer. This certainly is
> not required. Feel free to add your input:
> https://github.com/apache/kafka/pull/6592#discussion_r281152067
>
> Thank you!
>
> Best,
> Daniyar Yeralin
>
> > On May 6, 2019, at 11:59 AM, Daniyar Yeralin (JIRA) 
> wrote:
> >
> > Daniyar Yeralin created KAFKA-8326:
> > --
> >
> > Summary: Add List Serde
> > Key: KAFKA-8326
> > URL: https://issues.apache.org/jira/browse/KAFKA-8326
> > Project: Kafka
> >  Issue Type: Improvement
> >  Components: clients, streams
> >Reporter: Daniyar Yeralin
> >
> >
> > I propose adding serializers and deserializers for the java.util.List
> class.
> >
> > I have many use cases where I want to set the key of a Kafka message to
> be a UUID. Currently, I need to turn UUIDs into strings or byte arrays and
> use their associated Serdes, but it would be more convenient to serialize
> and deserialize UUIDs directly.
> >
> > I believe there are many use cases where one would want to have a List
> serde. Ex. [
> https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
> [
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> ]
> >
> >
> >
> > KIP Link: [
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization
> ]
> >
> >
> >
> > --
> > This message was sent by Atlassian JIRA
> > (v7.6.3#76005)
>
>


Re: [VOTE] KIP-454: Expansion of the ConnectClusterState interface

2019-05-06 Thread Konstantine Karantasis
Nice and useful KIP. Thanks Chris!

+1 (non-binding)

Konstantine


On Fri, May 3, 2019 at 5:14 PM Randall Hauch  wrote:

> Nice job, Chris!
>
> +1 (binding)
>
> On Thu, May 2, 2019 at 8:16 PM Magesh Nandakumar 
> wrote:
>
> > Thanks a lot for the work on this KIP Chris.
> >
> > +1(non-binding)
> >
> > On Thu, May 2, 2019, 4:56 PM Chris Egerton  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote for KIP-454:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface
> > >
> > > The discussion thread can be found at
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg96911.html
> > >
> > > Thanks to Konstantine Karantasis and Magesh Nandakumar for their
> > thoughtful
> > > feedback!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> >
>


Re: [VOTE] KIP-437: Custom replacement for MaskField SMT

2019-05-06 Thread Konstantine Karantasis
I think is is a useful improvement proposal. Thanks Valeria!

I'm +1 (non-binding) on this KIP

Konstantine

On Mon, Apr 15, 2019 at 2:04 AM Valeria Vasylieva <
valeria.vasyli...@gmail.com> wrote:

> Hi all,
>
> Since there are no more objections/proposals I would like to start the
> vote on KIP-437.
>
>
> See: KIP-437 <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
> >
> and related PR 
>
> I will be grateful to hear your opinion!
>
> Valeria
>


Kafka 2.3 KIP freeze on May 10th (Friday)

2019-05-06 Thread Colin McCabe
Hi all,

As per the release plan for 2.3, we're going to have a KIP freeze this week.  
Let's have it on Friday (May 10th).

For more details about the 2.3 release, check out 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=112820648

cheers,
Colin


[jira] [Resolved] (KAFKA-8056) Replace FindCoordinator request/response with automated protocol

2019-05-06 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8056.

Resolution: Fixed

> Replace FindCoordinator request/response with automated protocol
> 
>
> Key: KAFKA-8056
> URL: https://issues.apache.org/jira/browse/KAFKA-8056
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Konstantine Karantasis
Thanks for the KIP Magesh, it's quite useful towards the goals for more
general multi-tenancy in Connect.

Couple of comments from me too:

I think the fact that the default policy is 'null' (no implementation)
should be mentioned on the table next to the available implementations.
Currently the KIP says: 'In addition to the default implementation, ..."
but this is not very accurate because there is no concrete default
implementation. Just special handling of 'null' in
'connector.client.config.policy'

Regarding passing the overrides to the connector 'configure' method, I feel
it wouldn't hurt to pass them, but I also agree that leaving this out at
the moment is the safest option.

Since the interfaces and classes are listed in the KIP, I'd like to note
that Class is used as a raw type in field and return value declarations. We
should use the generic type instead.

Thanks for this improvement proposal!
Konstantine

On Mon, May 6, 2019 at 11:11 AM Magesh Nandakumar 
wrote:

> Randall,
>
> I was wondering if you had any thoughts on the above alternatives to deal
> with a default policy.  If it's possible, I would like to finalize the
> discussions and start a vote.
> Let me know your thoughts.
>
> Thanks,
> Magesh
>
> On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > The approach to return the to override configs could possibly make it
> > cumbersome to implement a custom policy. This is a new configuration and
> if
> > you don't explicitly set it the existing behavior remains as-is. Like
> > Chris, I also preferred this approach for the sake of simplicity.  If not
> > for the default `null` I would prefer to fall back to using `Ignore`
> which
> > is a misnomer to the interface spec but still gets the job done via
> > instanceOf checks. The other options I could think of are as below:-
> >
> >- have an enforcePolicy() method in the interface which by default
> >returns true and the Ignore implementation could return false
> >- introduce another worker config allow.connector.config.overrides
> >with a default value of false and the default policy can be None
> >
> > Let me know what you think.
> >
> > Thanks
> > Magesh
> >
> > On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch  wrote:
> >
> >> Thanks, Chris. I still think it's strange to have a non-policy, since
> >> there's now special behavior for when the policy is not specified.
> >>
> >> Perhaps the inability for a policy implementation to represent the
> >> existing
> >> behavior suggests that the policy interface isn't quite right. Could the
> >> policy's "validate" method take the overrides that were supplied and
> >> return
> >> the overrides that should be passed to the connector, yet still throwing
> >> an
> >> exception if any supplied overrides are not allowed. Then the different
> >> policy implementations might be:
> >>
> >>- Ignore (default) - returns all supplied override properties
> >>- None - throws exception if any override properties are supplied;
> >>always returns empty map if no overrides are provided
> >>- Principal - throws exception if other override properties are
> >>provided, but returns an empty map (since no properties should be
> >> passed to
> >>the connector)
> >>- All - returns all provided override properties
> >>
> >> All override properties defined on the connector configuration would be
> >> passed to the policy for validation, and assuming there's no error all
> of
> >> these overrides would be used in the producer/consumer/admin client. The
> >> result of the policy call, however, is used to determine which of these
> >> overrides are passed to the connector.
> >>
> >> This approach means that all behaviors can be implemented through a
> policy
> >> class, including the defaults. It also gives a bit more control to
> custom
> >> policies, should that be warranted. For example, validating the provided
> >> client overrides but passing all such override properties to the
> >> connector,
> >> which as I stated earlier is something I think connectors likely don't
> >> look
> >> for.
> >>
> >> Thoughts?
> >>
> >> Randall
> >>
> >> On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton 
> >> wrote:
> >>
> >> > Randall,
> >> >
> >> > The special behavior for null was my suggestion. There is no
> >> implementation
> >> > of the proposed interface that causes client overrides to be ignored,
> so
> >> > the original idea was to have a special implementation that would be
> >> > checked for by the Connect framework (probably via the instanceof
> >> operator)
> >> > and, if present, cause all would-be overrides to be ignored.
> >> >
> >> > I thought this may be confusing to people who may see that behavior
> and
> >> > wonder how to recreate it themselves, so I suggested leaving that
> policy
> >> > out and replace it with a check to see if a policy was specified at
> all.
> >> >
> >> > Would be interested in your thoughts on this, especially if there's an
> >> > 

Re: [VOT] KIP-449: Add connector contexts to Connect worker logs (vote thread)

2019-05-06 Thread Konstantine Karantasis
Great improvement for multi-tenancy.
Thanks Randall!

+1 (non-binding)

Konstantine

On Tue, Apr 30, 2019 at 9:18 PM Chris Egerton  wrote:

> +1 (non-binding)
>
> Really looking forward to this. Thanks, Randall!
>
> On Tue, Apr 30, 2019, 20:47 Magesh Nandakumar 
> wrote:
>
> > This will make connect debugging so much easier. Thanks a lot for driving
> > this Randall.
> >
> > +1 (non-binding)
> >
> > Thanks,
> > Magesh
> >
> > On Tue, Apr 30, 2019 at 7:19 PM Jeremy Custenborder <
> > jcustenbor...@gmail.com>
> > wrote:
> >
> > > +1 non binding
> > >
> > > On Mon, Apr 29, 2019 at 5:34 PM Randall Hauch 
> wrote:
> > > >
> > > > I would like to start the vote for KIP-258:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
> > > >
> > > > The KIP uses the Mapped Diagnostic Contexts (MDC) feature of SLF4J
> API
> > to
> > > > add more context to log messages from within Connect workers and
> > > connector
> > > > implementations. This would not be enabled by default, though it
> would
> > be
> > > > easy to enable within the Connect Log4J configuration.
> > > >
> > > > Thanks!
> > > >
> > > > Randall
> > >
> >
>


Re: [VOTE] KIP-453: Add close() method to RocksDBConfigSetter

2019-05-06 Thread John Roesler
Thanks, Sophie, I reviewed the KIP, and I agree this is the best /
only-practical approach.

+1 (non-binding)

On Mon, May 6, 2019 at 2:23 PM Matthias J. Sax 
wrote:

> +1 (binding)
>
>
>
> On 5/6/19 6:28 PM, Sophie Blee-Goldman wrote:
> > Hi all,
> >
> > I'd like to call for a vote on a minor KIP that adds a close() method to
> > the RocksDBConfigSetter interface of Streams.
> >
> > Link:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter
> >
> > This is important for users who have created RocksOBjects in
> > RocksDBConfigSetter#setConfig to avoid leaking off-heap memory
> >
> > Thanks!
> > Sophie
> >
>
>


[DISCUSS] KIP-461 - Improving replica fetcher behavior in case of partition failures

2019-05-06 Thread Aishwarya Gune
Hey All!

I have created a KIP to improve the behavior of replica fetcher when
partition failure occurs. Please do have a look at it and let me know what
you think.
KIP 461 -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure

-- 
Thank you,
Aishwarya


Re: [VOTE] KIP-453: Add close() method to RocksDBConfigSetter

2019-05-06 Thread Matthias J. Sax
+1 (binding)



On 5/6/19 6:28 PM, Sophie Blee-Goldman wrote:
> Hi all,
> 
> I'd like to call for a vote on a minor KIP that adds a close() method to
> the RocksDBConfigSetter interface of Streams.
> 
> Link:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter
> 
> This is important for users who have created RocksOBjects in
> RocksDBConfigSetter#setConfig to avoid leaking off-heap memory
> 
> Thanks!
> Sophie
> 



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-06 Thread Konstantine Karantasis
Useful and concise KIP

+1 (non-binding)

Konstantine

On Mon, May 6, 2019 at 10:43 AM Randall Hauch  wrote:

> Thanks, Dan. As mentioned on the discussion, this is really a nice little
> addition that was alway missing from the API.
>
> +1 (binding)
>
> Randall
>
> On Mon, May 6, 2019 at 9:23 AM dan  wrote:
>
> > I would like to start voting for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API
> >
> > thanks
> > dan
> >
>


Re: [DISCUSS] KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-06 Thread Konstantine Karantasis
This is a nice improvement, both from an operational standpoint as well as
for testing purposes, as we scale the number of connectors that a connect
cluster can run.

LGTM, thanks for the KIP Dan!


On Fri, May 3, 2019 at 2:50 PM Alex Liu  wrote:

> Good question,
>
> `info` is probably the best name for it. The updated output on the wiki
> looks reasonable to me.
>
> Alex
>
> On Fri, May 3, 2019 at 2:24 PM dan  wrote:
>
> > thanks. i think this make sense.
> >
> > i'm thinking we should just use repeated queryparams for this, so
> > `?expand=status=config`
> >
> > another thing is what do you think we should use for the `/` endpoint?
> was
> > thinking `?expand=info`
> >
> > output could look like
> >
> > w:kafka norwood$ curl -s '
> > http://localhost:8083/connectors?expand=status=config' | jq
> >
> > {
> >
> >   "blah": {
> >
> > "config": {
> >
> >   "name": "blah",
> >
> >   "config": {
> >
> > "connector.class":
> > "org.apache.kafka.connect.file.FileStreamSourceConnector",
> >
> > "file": "/tmp/lol",
> >
> > "tasks.max": "10",
> >
> > "name": "blah",
> >
> > "topic": "test-topic"
> >
> >   },
> >
> >   "tasks": [
> >
> > {
> >
> >   "connector": "blah",
> >
> >   "task": 0
> >
> > }
> >
> >   ],
> >
> >   "type": "source"
> >
> > },
> >
> > "status": {
> >
> >   "name": "blah",
> >
> >   "connector": {
> >
> > "state": "RUNNING",
> >
> > "worker_id": "10.200.25.241:8083"
> >
> >   },
> >
> >   "tasks": [
> >
> > {
> >
> >   "id": 0,
> >
> >   "state": "RUNNING",
> >
> >   "worker_id": "10.200.25.241:8083"
> >
> > }
> >
> >   ],
> >
> >   "type": "source"
> >
> > }
> >
> >   }
> >
> > }
> >
> >
> > will update the wiki with this info
> >
> > thanks
> > dan
> >
> > On Thu, May 2, 2019 at 4:43 PM Alex Liu  wrote:
> >
> > > Good idea, Dan. One thing I might suggest is to have the query
> parameters
> > > reflect the fact that there are multiple resources under each
> connector.
> > > There is `connectors//`, `connectors//config`, and
> > > `connectors//status`.
> > > Each of them returns a slightly different set of information, so it
> would
> > > be useful to allow the query parameter be a string instead of a
> > true/false
> > > flag. In this case, `expand=status,config` would specify expanding both
> > the
> > > /status and /config subresources into the response objects.
> > >
> > > Other than this detail, I think this is a useful addition to the
> Connect
> > > REST API.
> > >
> > > Alex
> > >
> >
>


[VOTE] KIP-453: Add close() method to RocksDBConfigSetter

2019-05-06 Thread Sophie Blee-Goldman
Hi all,

I'd like to call for a vote on a minor KIP that adds a close() method to
the RocksDBConfigSetter interface of Streams.

Link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter

This is important for users who have created RocksOBjects in
RocksDBConfigSetter#setConfig to avoid leaking off-heap memory

Thanks!
Sophie


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-05-06 Thread Magesh Nandakumar
Randall,

I was wondering if you had any thoughts on the above alternatives to deal
with a default policy.  If it's possible, I would like to finalize the
discussions and start a vote.
Let me know your thoughts.

Thanks,
Magesh

On Tue, Apr 30, 2019 at 8:46 PM Magesh Nandakumar 
wrote:

> Randall,
>
> The approach to return the to override configs could possibly make it
> cumbersome to implement a custom policy. This is a new configuration and if
> you don't explicitly set it the existing behavior remains as-is. Like
> Chris, I also preferred this approach for the sake of simplicity.  If not
> for the default `null` I would prefer to fall back to using `Ignore` which
> is a misnomer to the interface spec but still gets the job done via
> instanceOf checks. The other options I could think of are as below:-
>
>- have an enforcePolicy() method in the interface which by default
>returns true and the Ignore implementation could return false
>- introduce another worker config allow.connector.config.overrides
>with a default value of false and the default policy can be None
>
> Let me know what you think.
>
> Thanks
> Magesh
>
> On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch  wrote:
>
>> Thanks, Chris. I still think it's strange to have a non-policy, since
>> there's now special behavior for when the policy is not specified.
>>
>> Perhaps the inability for a policy implementation to represent the
>> existing
>> behavior suggests that the policy interface isn't quite right. Could the
>> policy's "validate" method take the overrides that were supplied and
>> return
>> the overrides that should be passed to the connector, yet still throwing
>> an
>> exception if any supplied overrides are not allowed. Then the different
>> policy implementations might be:
>>
>>- Ignore (default) - returns all supplied override properties
>>- None - throws exception if any override properties are supplied;
>>always returns empty map if no overrides are provided
>>- Principal - throws exception if other override properties are
>>provided, but returns an empty map (since no properties should be
>> passed to
>>the connector)
>>- All - returns all provided override properties
>>
>> All override properties defined on the connector configuration would be
>> passed to the policy for validation, and assuming there's no error all of
>> these overrides would be used in the producer/consumer/admin client. The
>> result of the policy call, however, is used to determine which of these
>> overrides are passed to the connector.
>>
>> This approach means that all behaviors can be implemented through a policy
>> class, including the defaults. It also gives a bit more control to custom
>> policies, should that be warranted. For example, validating the provided
>> client overrides but passing all such override properties to the
>> connector,
>> which as I stated earlier is something I think connectors likely don't
>> look
>> for.
>>
>> Thoughts?
>>
>> Randall
>>
>> On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton 
>> wrote:
>>
>> > Randall,
>> >
>> > The special behavior for null was my suggestion. There is no
>> implementation
>> > of the proposed interface that causes client overrides to be ignored, so
>> > the original idea was to have a special implementation that would be
>> > checked for by the Connect framework (probably via the instanceof
>> operator)
>> > and, if present, cause all would-be overrides to be ignored.
>> >
>> > I thought this may be confusing to people who may see that behavior and
>> > wonder how to recreate it themselves, so I suggested leaving that policy
>> > out and replace it with a check to see if a policy was specified at all.
>> >
>> > Would be interested in your thoughts on this, especially if there's an
>> > alternative that hasn't been proposed yet.
>> >
>> > Cheers,
>> >
>> > Chris
>> >
>> > On Tue, Apr 30, 2019, 18:01 Randall Hauch  wrote:
>> >
>> > > On Tue, Apr 30, 2019 at 4:20 PM Magesh Nandakumar <
>> mage...@confluent.io>
>> > > wrote:
>> > >
>> > > > Randall,
>> > > >
>> > > > Thanks a lot for your feedback.
>> > > >
>> > > > You bring up an interesting point regarding the overrides being
>> > available
>> > > > to the connectors. Today everything that is specified in the config
>> > while
>> > > > creating is available for the connector. But this is a specific case
>> > and
>> > > we
>> > > > could do either of the following
>> > > >
>> > > >
>> > > >- don't pass any configs with these prefixes to the
>> ConnectorConfig
>> > > >instance that's passed in the startConnector
>> > > >- allow policies as to whether the configurations with the
>> prefixes
>> > > >should be made available to the connector or not. Should this
>> also
>> > > > define a
>> > > >list of configurations?
>> > > >
>> > > > I personally prefer not passing the configs to Connector since
>> that's
>> > > > simple, straight forward and don't see a reason for the connector to
>> > > 

Re: [DISCUSS] KIP-440: Extend Connect Converter to support headers

2019-05-06 Thread Randall Hauch
Have we started a vote on this? I don't see the separate "[VOTE]" thread.

On Mon, Apr 29, 2019 at 6:19 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks Yaroslav, this KIP LGTM now too!
>
> To give some context regarding my previous comment: headers in Connect
> would probably have followed a similar approach if default methods in
> interfaces could be used at the time. But we could not have assumed java 8
> or later yet in the AK version that Connect headers were added, so, I
> believe, that led to two different converter interfaces.
>
> Thanks for the nicely written KIP!
> Konstantine
>
>
>
> On Mon, Apr 29, 2019 at 3:39 PM Randall Hauch  wrote:
>
> > Thanks for the update. Yes, IMO this KIP is ready for a vote.
> >
> > On Fri, Apr 26, 2019 at 12:15 AM sapie...@gmail.com 
> > wrote:
> >
> > > Hi Randall, Konstantine,
> > >
> > > I've updated the KIP to reflect the details we discussed here. Let me
> > know
> > > if it looks good and we can go to the voting phase.
> > >
> > > Thanks!
> > >
> > > On 2019/04/22 21:07:31, Randall Hauch  wrote:
> > > > I think it would be helpful to clarify this in the KIP, just so that
> > > > readers are aware that the headers will be the raw header bytes that
> > are
> > > > the same as what is in the Kafka record.
> > > >
> > > > The alternative I was referring to is exposing the Connect `Headers`
> > > > interface, which is different.
> > > >
> > > > On Mon, Apr 22, 2019 at 1:45 PM sapie...@gmail.com <
> sapie...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Konstantine, Randall,
> > > > >
> > > > > As you can see in the updated Converter interface, it always
> operates
> > > on
> > > > > `org.apache.kafka.common.header.Headers`.
> > > > >
> > > > > WorkerSinkTask simply uses Kafka message headers and passes them to
> > the
> > > > > `toConnectData` method.
> > > > >
> > > > > WorkerSourceTask leverages header converter to extract
> RecordHeaders,
> > > > > which implements Headers interface. Then RecordHeaders are passed
> to
> > > the
> > > > > `fromConnectData` method.
> > > > >
> > > > > So header converter is used as a way to get headers when converting
> > > data
> > > > > from internal Connect format to Kafka messages (cause there is no
> > > other way
> > > > > to get the headers in this case).
> > > > >
> > > > > I can add this to the KIP if it's helpful.
> > > > >
> > > > > Randall, what is the alternative approach you're referring to?
> > > > >
> > > > > On 2019/04/22 18:09:24, Randall Hauch  wrote:
> > > > > > Konstantine raises a good point. Which `Headers` is being
> > referenced
> > > in
> > > > > the
> > > > > > API? The Connect `org.apache.kafka.connect.header.Headers` would
> > > > > correspond
> > > > > > to what was already deserialized by the `HeaderConverter` or what
> > > will
> > > > > yet
> > > > > > be serialized by the `HeaderConverter`. Alternatively, the
> common `
> > > > > > org.apache.kafka.common.header.Headers` would correspond to the
> raw
> > > > > header
> > > > > > pairs from the underlying Kafka record.
> > > > > >
> > > > > > So, we probably want to be a bit more specific, and also mention
> > > why. And
> > > > > > we probably want to mention the other approach in the rejected
> > > > > alternatives.
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Randall
> > > > > >
> > > > > > On Mon, Apr 22, 2019 at 11:59 AM Konstantine Karantasis <
> > > > > > konstant...@confluent.io> wrote:
> > > > > >
> > > > > > > Thanks for the KIP Yaroslav!
> > > > > > >
> > > > > > > Apologies for the late comment. However, after reading the KIP
> > it's
> > > > > still
> > > > > > > not very clear to me what happens to the existing
> > > > > > > HeaderConverter interface and what's the expectation for
> existing
> > > code
> > > > > > > implementing this interface out there.
> > > > > > >
> > > > > > > Looking at the PR I see that the existing code is leaving the
> > > existing
> > > > > > > connect headers conversion unaffected. I'd expect by reading
> the
> > > KIP to
> > > > > > > understand what's the interplay of the current proposal with
> the
> > > > > existing
> > > > > > > implementation of KIP-145 that introduced headers in Connect.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Konstantine
> > > > > > >
> > > > > > > On Mon, Apr 22, 2019 at 9:07 AM Randall Hauch <
> rha...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for updating the KIP. It looks good to me, and since
> > there
> > > > > haven't
> > > > > > > > been any other issue mentioned in this month-long thread,
> it's
> > > > > probably
> > > > > > > > fine to start a vote.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Randall
> > > > > > > >
> > > > > > > > On Tue, Apr 2, 2019 at 3:12 PM Randall Hauch <
> rha...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the submission, Yaroslav -- and for building on
> > the
> > > > > > > suggestion
> > > > > > > > > 

Re: [VOTE] KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-06 Thread Randall Hauch
Thanks, Dan. As mentioned on the discussion, this is really a nice little
addition that was alway missing from the API.

+1 (binding)

Randall

On Mon, May 6, 2019 at 9:23 AM dan  wrote:

> I would like to start voting for
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API
>
> thanks
> dan
>


Re: [VOTE] KIP-422: Use the default value of max.poll.interval in Streams

2019-05-06 Thread John Roesler
You're absolutely right. I realized that at some point after the initial
message and just tried to play it cool.

Sorry about that.
-John

On Mon, Apr 15, 2019 at 11:59 AM Jun Rao  wrote:

> The voting thread seems to be referring to the wrong KIP number. It should
> be KIP-442 instead of KIP-422.
>
> Thanks,
>
> Jun
>
> On Wed, Apr 3, 2019 at 7:03 PM John Roesler  wrote:
>
> > Thanks all. The KIP-442 vote has passed with 3 binding votes (Guozhang,
> > Bill, and Damian) and one non-binding vote (me) in favor and none
> against.
> >
> > I'll update the KIP page.
> >
> > -John
> >
> > On Fri, Mar 29, 2019 at 10:29 AM Damian Guy 
> wrote:
> >
> > > +1
> > >
> > > On Wed, 27 Mar 2019 at 21:38, John Roesler  wrote:
> > >
> > > > Ah, good point, Guozhang. I'll remove that mention from the KIP.
> > > >
> > > > On Wed, Mar 27, 2019 at 3:30 PM Bill Bejeck 
> wrote:
> > > >
> > > > > +1 for me,
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > > On Wed, Mar 27, 2019 at 4:13 PM Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > +1 from me.
> > > > > >
> > > > > > Though note that we cannot make such changes in older versions
> > since
> > > > even
> > > > > > if we release new versions out of those branches they are
> > considered
> > > > > > bug-fix only and hence should not have any interface impacting
> > > changes.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Mar 27, 2019 at 12:55 PM John Roesler  >
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Since the KIP is so small, I'm going to optimistically start
> the
> > > vote
> > > > > for
> > > > > > > KIP-422 to remove our "max int" default max.poll.interval.ms
> in
> > > > > Streams
> > > > > > > and
> > > > > > > fall back to the Consumer default of five minutes.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams
> > > > > > >
> > > > > > > Permalink: https://cwiki.apache.org/confluence/x/1COGBg
> > > > > > >
> > > > > > > See also: https://issues.apache.org/jira/browse/KAFKA-6399
> > > > > > >
> > > > > > > Please let me know if you have any objections and wish to
> return
> > to
> > > > the
> > > > > > > discussion phase!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-05-06 Thread Paul Davidson
Thanks Arjun. I've updated the KIP using your suggestion - just a few
slight changes.

On Fri, May 3, 2019 at 4:48 PM Arjun Satish  wrote:

> Maybe we can say something like:
>
> This change can have an indirect impact on resource usage by a Connector.
> For example, systems that were enforcing quotas using a "consumer-[id]"
> client id will now have to update their configs to enforce quota on
> "connector-consumer-[id]". For systems that were not enforcing any
> limitations or using default quotas, there should be no change expected.
>
> Best,
>
> On Fri, May 3, 2019 at 1:38 PM Paul Davidson
>  wrote:
>
> > Thanks Arjun. I updated the KIP to mention the impact on quotas. Please
> let
> > me know if you think I need more detail. The paragraph I added was:
> >
> > Since the default client.id values are changing, this will also affect
> any
> > > user that has quotas defined against the current defaults. The current
> > > default client.id values are of the form: consumer-{count}  and
> > >  producer-{count}.
> >
> >
> > Thanks,
> >
> > Paul
> >
> > On Thu, May 2, 2019 at 5:36 PM Arjun Satish 
> > wrote:
> >
> > > Paul,
> > >
> > > You might want to make a note on the KIP regarding the impact on
> quotas.
> > >
> > > Thanks,
> > >
> > > On Thu, May 2, 2019 at 9:48 AM Paul Davidson
> > >  wrote:
> > >
> > > > Thanks for the votes everyone! KIP-411 is now accepted with:
> > > >
> > > > +3 binding votes (Randall, Jason, Gwen) , and
> > > > +3 non-binding votes (Ryanne, Arjun, Magesh)
> > > >
> > > > Regards,
> > > >
> > > > Paul
> > > >
> > > > On Wed, May 1, 2019 at 10:07 PM Arjun Satish  >
> > > > wrote:
> > > >
> > > > > Good point, Gwen. We always set a non empty value for client id:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L668
> > > > > .
> > > > >
> > > > > But more importantly, connect client ids (for consumers, for
> example)
> > > > were
> > > > > already of the form "consumer-[0-9]+", and from now on they will be
> > > > > "connector-consumer-[connector_name]-[0-9]+". So, at least for
> > connect
> > > > > consumers/producers, we would have already been hitting the default
> > > quota
> > > > > limits and nothing changes for them. You can correct me if I'm
> > missing
> > > > > something, but seems like this doesn't *break* backward
> > compatibility?
> > > > >
> > > > > I suppose this change only gives us a better way to manage that
> quota
> > > > > limit.
> > > > >
> > > > > Best,
> > > > >
> > > > > On Wed, May 1, 2019 at 9:16 PM Gwen Shapira 
> > wrote:
> > > > >
> > > > > > I'm confused. Surely the default quota applies on empty client
> IDs
> > > too?
> > > > > > otherwise it will be very difficult to enforce?
> > > > > > So setting the client name will only change something if there's
> > > > already
> > > > > a
> > > > > > quota for that client?
> > > > > >
> > > > > > On the other hand, I fully support switching to
> "easy-to-wildcard"
> > > > > template
> > > > > > for the client id.
> > > > > >
> > > > > > On Wed, May 1, 2019 at 8:50 PM Arjun Satish <
> > arjun.sat...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I just realized that setting the client.id on the will now
> > trigger
> > > > any
> > > > > > > quota restrictions (
> > > > > > > https://kafka.apache.org/documentation/#design_quotasconfig)
> on
> > > the
> > > > > > > broker.
> > > > > > > It seems like this PR will enforce quota policies that will
> > either
> > > > > > require
> > > > > > > admins to set limits for each task (since the chosen format is
> > > > > > > connector-*-id), or fallback to some default value.
> > > > > > >
> > > > > > > Maybe we should mention this in the backward compatibility
> > section
> > > > for
> > > > > > the
> > > > > > > KIP. At the same time, since there is no way atm to turn off
> this
> > > > > > feature,
> > > > > > > should this feature be merged and released in the upcoming
> v2.3?
> > > This
> > > > > is
> > > > > > > something the committers can comment better.
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 1, 2019 at 5:13 PM Gwen Shapira  >
> > > > wrote:
> > > > > > >
> > > > > > > > hell yeah!
> > > > > > > > +1
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Apr 5, 2019 at 9:08 AM Paul Davidson
> > > > > > > >  wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > Since we seem to have agreement in the discussion I would
> > like
> > > to
> > > > > > start
> > > > > > > > the
> > > > > > > > > vote on KIP-411.
> > > > > > > > >
> > > > > > > > > See:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > > > > > > >
> > > > > > > > > Also see the related PR:
> > > > 

Re: Cleaning up command line tools argument parsing a little

2019-05-06 Thread Sönke Liebau
Hi Colin,

it was my intention to keep the structure of the commands mostly intact
while doing the refactoring - if that is possible, have not really checked
yet to be honest.

But what I wanted to try and do is recreate the current parsing with
argparse as much as possible. And in the process simply adding synonyms,
for example make the kafka-console-producer understand a
bootstrap-parameter in addition to broker-list.
There is a bit of custom logic about which parameters go together etc. in
the current classes, so output may look different here and there, but in
principle I do believe that it should be possible to recreate the current
structure.

If there is an appetite for a new, hadoop-like entrypoint anyway, then all
of this might be "wasted" effort, or rather effort better spent though, you
are right.

Best regards,
Sönke



On Mon, May 6, 2019 at 7:13 PM Colin McCabe  wrote:

> Hi Sönke,
>
> #2 is a bit tough because people have come to rely on the way the commands
> are structured right now.
>
> If we want to make big changes, it might be easier just to create a
> separate tool and deprecate the old one(s).  One thing we've talked about
> doing in the past is creating a single entry point for all the tool
> functionality, kind of like hadoop did with the "hadoop" command  Or git
> with the "git" command, etc.  Then we could deprecate the standalone
> commands and remove them after enough time had passed-- kind of like the
> old consumer.
>
> On the other hand, a more incremental change would be standardizing flags
> a bit.  So for example, at least setting it up so that there is a standard
> way of supplying bootstrap brokers, etc.  We could keep the old flags
> around for a while as variants to ease the transition.
>
> best,
> Colin
>
>
> On Sun, May 5, 2019, at 00:54, Sönke Liebau wrote:
> > Hi Colin,
> >
> > I totally agree! Especially the differently named bootstrap server
> options
> > have been annoying me a long time.
> >
> > I'd propose a two-step approach:
> > 1. Add new default options objects similar to CommandLineUtils and
> > CommandDefaultOptions (based on argparse4j) but in the clients project,
> as
> > this is referenced by all command line tools as far as I can tell
> > 2. Refactor tools one by one to use these new helper classes (and thus
> > argparse) and add standardized synonyms for parameters as necessary
> >
> > I think for step 1 we can get away with no KIP, as this doesn't change
> any
> > public interfaces or behavior.
> > Step 2 probably needs a KIP as we are adding new parameters? We can pick
> up
> > KIP-14 again for that I think. A lot of work has been done on that
> already.
> >
> > Does that sound useful to everybody?
> >
> > Best regards,
> > Sönke
> >
> >
> > On Thu, Apr 18, 2019 at 1:44 AM Colin McCabe  wrote:
> >
> > > If we are going to standardize on one argument parsing library, it
> should
> > > certainly be argparse4j, I think.
> > >  argparse4j is simply a better argument parsing library with support
> for
> > > more features.  One example is mutually exclusive options.  argparse4j
> > > supports this with MutuallyExclusiveGroup.  jopt doesn't support this,
> so
> > > when it is needed, we have to add extra code to manually check that
> > > mutually exclusive options are not set.
> > >
> > > argparse4j also has subcommands.  If you want something like "git add"
> > > with some set of flags, and "git remove" with another, you can do this
> with
> > > argparse4j, but not with jopt.  This would be very helpful for
> clearing up
> > > confusion in a lot of our shell scripts which have accumulated dozens
> of
> > > arguments, most of which are only relevant to a very specific
> operation.
> > > But you just can't do it with jopt.
> > >
> > > Just to give an example, argparse4j with subcommands would allow you to
> > > run something like ./kafka-topics.sh list --help and get just options
> that
> > > were relevant for listing topics, not the full dozens of options that
> might
> > > relate to adding topics, removing them, etc.
> > >
> > > To be honest, though, what would help users the most is standardizing
> the
> > > option flags across tools.  We should have a standard way of specifying
> > > bootstrap brokers, for example.  (We can continue to support the old
> > > synonyms for a while, of course.)
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Apr 17, 2019, at 08:56, Guozhang Wang wrote:
> > > > I took another look at the PR itself and I think it would be great to
> > > have
> > > > this cleanup too -- I cannot remember at the beginning why we
> gradually
> > > > moved to different mechanism (argparse4j) for different cmds, if
> there's
> > > no
> > > > rationales behind it we should just make them consistent.
> > > >
> > > > Thanks for driving this!
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 17, 2019 at 7:19 AM Ryanne Dolan 
> > > wrote:
> > > >
> > > > > Sönke, I'd find this very helpful. It's annoying to keep track of
> which
> > > > > 

Re: Cleaning up command line tools argument parsing a little

2019-05-06 Thread Colin McCabe
Hi Sönke,

#2 is a bit tough because people have come to rely on the way the commands are 
structured right now.

If we want to make big changes, it might be easier just to create a separate 
tool and deprecate the old one(s).  One thing we've talked about doing in the 
past is creating a single entry point for all the tool functionality, kind of 
like hadoop did with the "hadoop" command  Or git with the "git" command, etc.  
Then we could deprecate the standalone commands and remove them after enough 
time had passed-- kind of like the old consumer.

On the other hand, a more incremental change would be standardizing flags a 
bit.  So for example, at least setting it up so that there is a standard way of 
supplying bootstrap brokers, etc.  We could keep the old flags around for a 
while as variants to ease the transition.

best,
Colin


On Sun, May 5, 2019, at 00:54, Sönke Liebau wrote:
> Hi Colin,
> 
> I totally agree! Especially the differently named bootstrap server options
> have been annoying me a long time.
> 
> I'd propose a two-step approach:
> 1. Add new default options objects similar to CommandLineUtils and
> CommandDefaultOptions (based on argparse4j) but in the clients project, as
> this is referenced by all command line tools as far as I can tell
> 2. Refactor tools one by one to use these new helper classes (and thus
> argparse) and add standardized synonyms for parameters as necessary
> 
> I think for step 1 we can get away with no KIP, as this doesn't change any
> public interfaces or behavior.
> Step 2 probably needs a KIP as we are adding new parameters? We can pick up
> KIP-14 again for that I think. A lot of work has been done on that already.
> 
> Does that sound useful to everybody?
> 
> Best regards,
> Sönke
> 
> 
> On Thu, Apr 18, 2019 at 1:44 AM Colin McCabe  wrote:
> 
> > If we are going to standardize on one argument parsing library, it should
> > certainly be argparse4j, I think.
> >  argparse4j is simply a better argument parsing library with support for
> > more features.  One example is mutually exclusive options.  argparse4j
> > supports this with MutuallyExclusiveGroup.  jopt doesn't support this, so
> > when it is needed, we have to add extra code to manually check that
> > mutually exclusive options are not set.
> >
> > argparse4j also has subcommands.  If you want something like "git add"
> > with some set of flags, and "git remove" with another, you can do this with
> > argparse4j, but not with jopt.  This would be very helpful for clearing up
> > confusion in a lot of our shell scripts which have accumulated dozens of
> > arguments, most of which are only relevant to a very specific operation.
> > But you just can't do it with jopt.
> >
> > Just to give an example, argparse4j with subcommands would allow you to
> > run something like ./kafka-topics.sh list --help and get just options that
> > were relevant for listing topics, not the full dozens of options that might
> > relate to adding topics, removing them, etc.
> >
> > To be honest, though, what would help users the most is standardizing the
> > option flags across tools.  We should have a standard way of specifying
> > bootstrap brokers, for example.  (We can continue to support the old
> > synonyms for a while, of course.)
> >
> > best,
> > Colin
> >
> >
> > On Wed, Apr 17, 2019, at 08:56, Guozhang Wang wrote:
> > > I took another look at the PR itself and I think it would be great to
> > have
> > > this cleanup too -- I cannot remember at the beginning why we gradually
> > > moved to different mechanism (argparse4j) for different cmds, if there's
> > no
> > > rationales behind it we should just make them consistent.
> > >
> > > Thanks for driving this!
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 17, 2019 at 7:19 AM Ryanne Dolan 
> > wrote:
> > >
> > > > Sönke, I'd find this very helpful. It's annoying to keep track of which
> > > > commands use which form -- I always seem to guess wrong.
> > > >
> > > > Though I don't think there is any reason to deprecate existing forms,
> > e.g.
> > > > consumer.config vs consumer-config. I think it's perfectly reasonable
> > to
> > > > have multiple spellings of the same arguments. I don't really see a
> > > > downside to keeping the aliases around indefinitely.
> > > >
> > > > Ryanne
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Apr 17, 2019, 7:07 AM Sönke Liebau
> > > >  wrote:
> > > >
> > > > > Hi everybody,
> > > > >
> > > > > Jason and I were recently discussing command line argument parsing on
> > > > > KAFKA-8131 (or rather the related pull request) [1].
> > > > >
> > > > > Command line tools and their arguments are somewhat diverse at the
> > > > moment.
> > > > > Most of the tools use joptsimple for argument parsing, some newer
> > java
> > > > > tools use argparse4j instead and some tools use nothing at all.
> > > > > I've looked for a reason as to why there are two libraries being
> > used,
> > > > but
> > > > > couldn't really find anything. Paolo brought 

[DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-05-06 Thread Development
Hello,

Starting a discussion for KIP-466 adding support for List Serde. PR is created 
under https://github.com/apache/kafka/pull/6592 


There are two topics I would like to discuss:
1. Since type for List serve needs to be declared before hand, I could not 
create a static method for List Serde under 
org.apache.kafka.common.serialization.Serdes. I addressed it in the KIP: 
P.S. Static method corresponding to ListSerde under 
org.apache.kafka.common.serialization.Serdes (something like static public 
Serde> List() {...} inorg.apache.kafka.common.serialization.Serdes) 
class cannot be added because type needs to be defined beforehand. That's why 
one needs to create List Serde in the following fashion:
new Serdes.ListSerde(Serdes.String(), 
Comparator.comparing(String::length));
(can possibly be simplified by declaring import static 
org.apache.kafka.common.serialization.Serdes.ListSerde)

2. @miguno Michael G. Noll  is questioning whether I 
need to pass a comparator to ListDeserializer. This certainly is not required. 
Feel free to add your input:
https://github.com/apache/kafka/pull/6592#discussion_r281152067

Thank you!

Best,
Daniyar Yeralin

> On May 6, 2019, at 11:59 AM, Daniyar Yeralin (JIRA)  wrote:
> 
> Daniyar Yeralin created KAFKA-8326:
> --
> 
> Summary: Add List Serde
> Key: KAFKA-8326
> URL: https://issues.apache.org/jira/browse/KAFKA-8326
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Daniyar Yeralin
> 
> 
> I propose adding serializers and deserializers for the java.util.List class.
> 
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> 
> I believe there are many use cases where one would want to have a List serde. 
> Ex. 
> [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
>  
> [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]
> 
>  
> 
> KIP Link: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]
> 
> 
> 
> --
> This message was sent by Atlassian JIRA
> (v7.6.3#76005)



Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-05-06 Thread Colin McCabe
On Mon, May 6, 2019, at 07:39, Ismael Juma wrote:
> Hi Colin,
> 
> A quick comment.
> 
> On Sat, May 4, 2019 at 11:18 PM Colin McCabe  wrote:
> 
> > The big advantage of doing batching on the controller is that the
> > controller has more information about what is going on in the cluster.  So
> > it can schedule reassignments in a more optimal way.  For instance, it can
> > schedule reassignments so that the load is distributed evenly across
> > nodes.  This advantage is lost if we have to adhere to a rigid ordering
> > that is set up in advance.  We don't know exactly when anything will
> > complete in any case.  Just because one partition reassignment was started
> > before another doesn't mean it will finish before another.
> 
> 
> This is not quite true, right? The Controller doesn't know about partition
> sizes, throughput per partition and other such information that external
> tools like Cruise Control track.

Hi Ismael,

That's a good point, and one I should have included.

I guess when I think about "do batching in the controller" versus "do batching 
in an external system" I tend to think about the information the controller 
could theoretically collect, rather than what it actually does :)  But 
certainly, adding this information to the controller would be a significant 
change, and maybe one we don't want to do if the external systems work well 
enough.

Thinking about this a little bit more, I can see three advantages to 
controller-side batching.  Firstly, doing batching in the controller saves 
memory because we don't use a separate JVM, and don't duplicate the in-memory 
map of all the partitions.  Secondly, the information we're acting on would 
also be more up-to-date.  (I'm not sure how important this would be.)  Finally, 
it's one less thing to deploy.  I don't know if those are really enough to 
motivate switching now, but in a greenfield system I would probably choose 
controller-side rebalancing.

In any case, this KIP is orthogonal to controller-side rebalancing versus 
external rebalancing.  That's why the KIP states that we will continue to 
perform all the given partition rebalances immediately.  I was just responding 
to the idea that maybe we should have an "ordering" of rebalancing partitions.  
I don't think we want that, for controller-side rebalancing or externally 
batched rebalancing.

best,
Colin


Re: [DISCUSS] KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

2019-05-06 Thread John Roesler
Thanks for the KIP, Jukka! Sorry it took so long for me to review it.

Just a few questions, in no particular order:

InputTopic:

1. Have you considered adding a ProducerRecord input method to the input
topic? This might be unnecessary, given the overloads you provide. I'm
wondering if it decreases the domain mismatch between TopologyTestDriver
and KafkaStreams, though, since in production code, you send input to the
app as a ProducerRecord, via a topic. Also, this might let you drop some of
the less mainstream overloads, like the ones for headers.

2. On the "pipeList" methods, it seems like the "start,advance" timestamp
approach forces me to do a little mental math, if I actually want to
achieve some specific timestamp per record, or to be able to verify the
result, given specific timestamps as input. Did you consider a
KeyValueTimestamp value type instead? Alternatively, if you like the
ProducerRecord approach, above, you could lean on that instead.

3. I wasn't clear on the semantics of the constructors that take a start
timestamp, but no advance time. I also wasn't clear on the semantics when
the constructor specifies start/advance, but then we also call the input
methods that specify timestamps, or start/advance timestamps. Also related,
what's the "default" timestamp, if no start is specified, "zero" or "now"
both seem reasonable. Similar with the advance, "1ms" and "0ms" both seem
reasonable defaults.

OutputTopic:

4. Tentatively, ProducerRecord seems like a strange output type, since as a
user, I'm "consuming" the results. How about using ConsumerRecord instead?

5. We have methods above for producing with specific timestamps, but none
for observing the timestamps. How can we strengthen the symmetry?

6. (just a comment) I like the readToMap method. Thanks!

7. I know it clashes somewhat with the Kafka semantics, but I'm a little
concerned about null semantics in the output topic. In particular,
"readValue()" returning null ambiguously indicates both "there is no value"
and "there is a null value present". Can we consider throwing a
NoSuchElementException when you attempt to read, but there is nothing there?

7.5. This would necessitate adding a boolean method to query the presence
of output records, which can be a handy way to cap off a test:
`assertFalse(outputTopic.hasRecords(),
outputTopic.readKeyValuesToList().toString())` would fail and print out the
remaining records if there are any.

General:

8. Can the TTD, input, and output topics implement AutoCloseable, to
facilitate try-with-resources in tests?

Example:
try (driver = new TTD(), input = new TestInputTopic(), output = new
TestOutputTopic() ) {
 ...
} // ensures everything is closed

7. Should we add some assertion that all the output is consumed when you
call testDriver.close() ?

8. Should we consider deprecating (some or all) of the overlapping
mechanisms in TopologyTestDriver? It seems like this might improve
usability by reducing ambiguity.

9. Can you give us some idea of the javadoc for each of the new methods
you're proposing? The documentation is also part of the public API, and it
also helps us understand the semantics of the operations you're proposing.

That's all! Thanks so much for this awesome proposal,
-John

On Mon, May 6, 2019 at 6:58 AM Jukka Karvanen 
wrote:

> Hi,
>
> Now everything what I planned to add including tests, samples and document
> changes are in my branch:
>
> https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics
>
>
> So I can create PR as soon as this KIP is getting green light to proceed.
>
> Jukka
>
> la 4. toukok. 2019 klo 9.05 Jukka Karvanen (jukka.karva...@jukinimi.com)
> kirjoitti:
>
> > Hi,
> >
> > New TestInputTopic and TestOutputTopic included to Developer guide
> testing
> > page as alternative,
> > The old way with ConsumerRecordFactory and OutputVerifier is not removed.
> >
> > You can see the proposal here in my branch:
> >
> >
> https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics
> >
> >
> > I can create Work In progress pull request if that make commenting
> > proposal easier.
> > Still planning to add full coverage unit test and sample
> WordCountDemoTest to
> >
> streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount,
> > if this KIP is accepted.
> >
> > Jukka
> >
> >
> > ti 30. huhtik. 2019 klo 13.59 Matthias J. Sax (matth...@confluent.io)
> > kirjoitti:
> >
> >> KIP-451 was discarded in favor this this KIP. So it seems we are all on
> >> the same page.
> >>
> >>
> >> >> Relating to KIP-448.
> >> >> What kind of alignment did you think about?
> >>
> >> Nothing in particular. Was more or less a random though. Maybe there is
> >> nothing to be aligned. Just wanted to bring it up. :)
> >>
> >>
> >> >> Some thoughts after reading also the comments in KAFKA-6460:
> >> >> To my understand these KIP-448 mock classes need to be fed somehow
> into
> >> >> TopologyTestDriver.
> >> 

Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-05-06 Thread Jose Armando Garcia Sancio
On Sun, May 5, 2019 at 12:35 PM Stanislav Kozlovski 
wrote:

> Hey there Jose, thanks for the KIP!
>
> I have one small nit regarding the `kafka-leader-election.sh` tool. I agree
> with Jason that it is probably better to force users be explicit in their
> desired election type. I was wondering whether it makes sense to support
> only "preferred" and "unclean" for the "--election-type" flag, not the
> numeric values.
>

Thanks for the feedback Stanislav. I went ahead and updated KIP-460 to only
allow "preferred" and "unclean" for the "--election-type" flag.

Thanks!
-Jose


Re: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

2019-05-06 Thread John Roesler
Thanks all (or range? ;) ) for the discussion. Good points all around.

Although I see the allure of naming the metrics the same as the things
they're measuring, it seems not to be perfect. Seconding Matthias's latter
thought, I think it's likely you'd want to measure the method calls
independently, since the different range variants would have wildly
different characteristics, which could then lead you to want to orient the
storage differently to support particular use cases.

Pointing out some structural characteristics (I know you all know this
stuff, I'm just constructing a table for analysis):
* Java supports method-name overloading. *Different* methods can have the
same names, distinguished by arg lists; it doesn't change the fact that
they are different methods.
* Metrics does not support metric-name overloading, but metric names do
have some structure we could exploit, if you consider the tags.

It seems to me that there's actually more domain mismatch if we just have
one metric named "range", since (e.g., in the SessionStore proposal above)
the Java API has *four* methods named "range".

Two potential solutions I see:
* hierarchical metric names: "range-single-key-all-time",
"range-key-range-all-time", "range-single-key-time-range",
"range-key-range-time-range", maybe with better names... I'm not the best
at this stuff. Hopefully, you see the point, though... they all start with
"range", which provides the association to the method, and all have a
suffix which identifies the overload being measured.
* tagged metric names: "range" {"variant": "single-key-all-time"}, "range"
{"variant": "key-range-all-time"}, "range" {"variant":
"single-key-time-range"}, "range" {"variant": "key-range-time-range"} . Or
you could even split the tags up semantically, but my instinct says that
that would just make it harder to digest the metrics later on.

Just some ideas.
-John

On Fri, Apr 26, 2019 at 3:51 AM Matthias J. Sax 
wrote:

> Thanks for the input Guozhang. I was not aware of those dependencies.
>
> It might be good to align this KIP with the metrics cleanup. Not sure
> atm, if we should use different metric names for different overloads,
> even if those have the same method name?
>
> If we rename all method to `range()` and use the same metric name for
> all, one could argue that this is still fine, because the metric
> collects how often a range query is executed (regardless of the range
> itself).
>
> On the other hand, this would basically be a "roll up". It could still
> be valuable to distinguish between single-key-time-range,
> key-range-time-range, and all-range queries. Users could still aggregate
> those later if they are not interested in the details, while it's not
> possible for user to split a pre-aggregated metric into it's component.
>
>
> Input from others might be helpful here, too.
>
>
> -Matthias
>
> On 4/11/19 6:00 PM, Guozhang Wang wrote:
> > While working at KIP-444 (https://github.com/apache/kafka/pull/6498) I
> > realized there are a bunch of issues on metric names v.s. function names,
> > e.g. some function named `fetchAll` are actually measure with `fetch`,
> etc.
> > So in that KIP I proposed to make the function name aligned with metrics
> > name. So suppose we rename the functions from `fetch` to `range` I'd
> > suggest we make this change as part of KIP-444 as well. Note that it
> means
> > different functions with the same name `range` will be measured under a
> > single metric then.
> >
> > But still for function named `all` it will be measured under a separate
> > metric named `all`, so I'm just clarifying with you if that's the
> intention.
> >
> >
> > Guozhang
> >
> > On Thu, Apr 11, 2019 at 2:04 PM Matthias J. Sax 
> > wrote:
> >
> >> I did not see a reason to rename `all()` to `range()`. `all()` does not
> >> take any parameters to limit a range and is a good name IMHO. But I am
> >> not married to keep `all()` and if we think we should rename it, too, I
> >> am fine with it.
> >>
> >> Not sure what connection you make to metrics though. Can you elaborate?
> >>
> >>
> >> Would be interested to hear others opinions on this, too.
> >>
> >>
> >> -Matthias
> >>
> >> On 4/11/19 8:38 AM, Guozhang Wang wrote:
> >>> I like the renaming, since it also aligns with our metrics cleanup
> >>> (KIP-444) which touches upon the store level metrics as well.
> >>>
> >>> One question: you seems still suggesting to keep "all" with the current
> >>> name (and also using a separate metric for it), what's the difference
> >>> between this one and other "range" functions?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Thu, Apr 11, 2019 at 2:26 AM Matthias J. Sax  >
> >>> wrote:
> >>>
>  Thanks for the input.
> 
> >> Just to clarify the naming conflicts is between the newly added
> >> function
> >> and the old functions that we want to deprecate / remove right? The
> 
>  Yes, the conflict is just fort the existing `fetch()` methods for
> which
>  we want to change the 

[jira] [Resolved] (KAFKA-8319) Flaky Test KafkaStreamsTest.statefulTopologyShouldCreateStateDirectory

2019-05-06 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8319.

Resolution: Cannot Reproduce

> Flaky Test KafkaStreamsTest.statefulTopologyShouldCreateStateDirectory
> --
>
> Key: KAFKA-8319
> URL: https://issues.apache.org/jira/browse/KAFKA-8319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>  Labels: flaky-test
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8326) Add List Serde

2019-05-06 Thread Daniyar Yeralin (JIRA)
Daniyar Yeralin created KAFKA-8326:
--

 Summary: Add List Serde
 Key: KAFKA-8326
 URL: https://issues.apache.org/jira/browse/KAFKA-8326
 Project: Kafka
  Issue Type: Improvement
  Components: clients, streams
Reporter: Daniyar Yeralin


I propose adding serializers and deserializers for the java.util.List class.

I have many use cases where I want to set the key of a Kafka message to be a 
UUID. Currently, I need to turn UUIDs into strings or byte arrays and use their 
associated Serdes, but it would be more convenient to serialize and deserialize 
UUIDs directly.

I believe there are many use cases where one would want to have a List serde. 
Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

 

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-06 Thread John Roesler
Ivan,

That's a very good point about the "start" operator in the dynamic case. I
had no problem with "split()"; I was just questioning the necessity. Since
you've provided a proof of necessity, I'm in favor of the "split()" start
operator. Thanks!

Separately, I'm interested to see where the present discussion leads. I've
written enough Javascript code in my life to be suspicious of nested
closures. You have a good point about using method references (or indeed
function literals also work). It should be validating that this was also
the JS community's first approach to flattening the logic when their nested
closure situation got out of hand. Unfortunately, it's replacing nesting
with redirection, both of which disrupt code readability (but in different
ways for different reasons). In other words, I agree that function
references is *the* first-order solution if the nested code does indeed
become a problem.

However, the history of JS also tells us that function references aren't
the end of the story either, and you can see that by observing that there
have been two follow-on eras, as they continue trying to cope with the
consequences of living in such a callback-heavy language. First, you have
Futures/Promises, which essentially let you convert nested code to
method-chained code (Observables/FP is a popular variation on this). Most
lately, you have async/await, which is an effort to apply language (not
just API) syntax to the problem, and offer the "flattest" possible
programming style to solve the problem (because you get back to just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is nowhere near
as callback heavy as JS, so I don't think we have to take the JS story for
granted, but then again, I think we can derive some valuable lessons by
looking sideways to adjacent domains. I'm just bringing this up to inspire
further/deeper discussion. At the same time, just like JS, we can afford to
take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge (and I'd also add
join) problem that Paul brought up. We can clearly punt on it, by
terminating the nested branches with sink operators. But is there a DSL way
to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen  wrote:

> Ivan, I’ll definitely forfeit my point on the clumsiness of the
> branch(predicate, consumer) solution, I don’t see any real drawbacks for
> the dynamic case.
>
> IMO the one trade off to consider at this point is the scope question. I
> don’t know if I totally agree that “we rarely need them in the same scope”
> since merging the branches back together later seems like a perfectly
> plausible use case that can be a lot nicer when the branched streams are in
> the same scope. That being said, for the reasons Ivan listed, I think it is
> overall the better solution - working around the scope thing is easy enough
> if you need to.
>
> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev 
> wrote:
> >
> > Hello everyone, thank you all for joining the discussion!
> >
> > Well, I don't think the idea of named branches, be it a LinkedHashMap
> (no other Map will do, because order of definition matters) or `branch`
> method  taking name and Consumer has more advantages than drawbacks.
> >
> > In my opinion, the only real positive outcome from Michael's proposal is
> that all the returned branches are in the same scope. But 1) we rarely need
> them in the same scope 2) there is a workaround for the scope problem,
> described in the KIP.
> >
> > 'Inlining the complex logic' is not a problem, because we can use method
> references instead of lambdas. In real world scenarios you tend to split
> the complex logic to methods anyway, so the code is going to be clean.
> >
> > The drawbacks are strong. The cohesion between predicates and handlers
> is lost. We have to define predicates in one place, and handlers in
> another. This opens the door for bugs:
> >
> > - what if we forget to define a handler for a name? or a name for a
> handler?
> > - what if we misspell a name?
> > - what if we copy-paste and duplicate a name?
> >
> > What Michael propose would have been totally OK if we had been writing
> the API in Lua, Ruby or Python. In those languages the "dynamic naming"
> approach would have looked most concise and beautiful. But in Java we
> expect all the problems related to identifiers to be eliminated in compile
> time.
> >
> > Do we have to invent duck-typing for the Java API?
> >
> > And if we do, what advantage are we supposed to get besides having all
> the branches in the same scope? Michael, maybe I'm missing your point?
> >
> > ---
> >
> > Earlier in this discussion John Roesler also proposed to do without
> "start branching" operator, and later Paul mentioned that in the case when
> we have to add a dynamic number of branches, the current KIP is 'clumsier'
> compared to Michael's 'Map' solution. Let me address both 

Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

2019-05-06 Thread Mike Freyberger
Hi Matthias,

Once KIP-429 is released,  all non-sticky assignors will be as useful. Any user 
that wants to take advantage of KIP-429 needs to use a sticky assignor. 
Currently there is only 1 sticky assignor in the kafka project, which is 
similar to RoundRobinAssignor, but a sticky verion. I imagine there will be 
users who currently use RangeAssignor but want to take advantage of KIP-429. 
So, having more directly accessible sticky assignors will allow for more users 
to take advantage of KIP-429, without being forced to use Kafka Streams. Maybe 
I should reframe the KIP to essentially being a sticky version of RangeAssignor?

Regarding how I am using a KV store instead of a kafka compacted topic: I 
simply prepend my keys with the incoming kafka partition, so on partition 
assignment I can scan the KV store for all keys within the assigned partition.

Mike

On 4/30/19, 6:49 AM, "Matthias J. Sax"  wrote:

Mike,

I am still not sure, why we need to add this assignor to the project.
Even after you pointed out that you cannot use Kafka Streams, the idea
of the consumer to make the `PartitionAssignor` interface public and
plugable is, that the project does not need to add strategies for all
kind of use cases, but that people can customize the assignors to their
needs.

My main question is: how generic is this use case (especially with Kafka
Streams offering joins out-of-the-box) and do we really need to add it?
So far, it seems ok to me, if you just write a custom assignor and plug
it into the consumer. I don't see a strong need to add it to the Kafka
code base. Basically, it breaks down to

- How many people use joins?
- How many people can or can't use Kafka Streams joins?
- To what extend can Kafka Streams be improved to increase the use-case
coverage?
- How many people would benefit? (ie, even with adding this assignor,
they might still be users who need to customize their own assignors
because their join-use-case is still different to yours.)


Also note, that in Kafka Streams you could still provide a custom state
store implementation (with or without using a compacted changelog) and a
`Processor` or `Transformer` to implement a custom join. Even if this
might not work for your specific case, it might work for many other
people who want to customer a join instead of using Kafka Streams'
out-of-the-box join.


Can you elaborate why you think it needs to be part of Kafka directly?


One last question:

> - Our state has a high eviction rate, so kafka compacted topics are not 
ideal for storing the changelog. The compaction cannot keep up and the topic 
will be majority tombstones when it is read on partition reassignment. We are 
using a KV store the "change log" instead.

What do you mean by 'We are using a KV store the "change log" instead.'?
How to you handle reassignment and state movement? Curious to see if we
could improve Kafka Streams :)


-Matthias


On 4/30/19 3:09 AM, Mike Freyberger wrote:
> In light of KIP-429, I think there will be an increased demand for sticky 
assignors. So, I'd like to restart the conversation about adding the sticky 
streams assignor, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor.
 
> 
> It’d be great to get feedback on the overall idea and the proposed 
implementation.
> 
> Thanks,
> 
> Mike
> 
> 
> On 6/20/18, 5:47 PM, "Mike Freyberger"  wrote:
> 
> Matthias, 
> 
> Thanks for the feedback. For our use case, we have some complexities 
that make using the existing Streams API more complicated than using the Kafka 
Consumer directly. 
> 
> - We are doing async processing, which I don't think is currently 
available (KIP-311 is handling this). 
> 
> - Our state has a high eviction rate, so kafka compacted topics are 
not ideal for storing the changelog. The compaction cannot keep up and the 
topic will be majority tombstones when it is read on partition reassignment. We 
are using a KV store the "change log" instead.
> 
> - We wanted to separate consumer threads from worker threads to 
maximize parallelization while keeping consumer TCP connections down.
> 
> Ultimately, it was much simpler to use the KafkaConsumer directly 
rather than peel away a lot of what Streams API does for you. I think we should 
continue to add support for more complex use cases and processing to the 
Streams API. However, I think there will remain streaming join use cases that 
can benefit from the flexibility that comes from using the KafkaConsumer 
directly. 
> 
> Mike
> 
> On 6/20/18, 5:08 PM, "Matthias J. Sax"  wrote:
> 
> Mike,
> 
> thanks a lot for 

Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-05-06 Thread Ismael Juma
Hi Colin,

A quick comment.

On Sat, May 4, 2019 at 11:18 PM Colin McCabe  wrote:

> The big advantage of doing batching on the controller is that the
> controller has more information about what is going on in the cluster.  So
> it can schedule reassignments in a more optimal way.  For instance, it can
> schedule reassignments so that the load is distributed evenly across
> nodes.  This advantage is lost if we have to adhere to a rigid ordering
> that is set up in advance.  We don't know exactly when anything will
> complete in any case.  Just because one partition reassignment was started
> before another doesn't mean it will finish before another.


This is not quite true, right? The Controller doesn't know about partition
sizes, throughput per partition and other such information that external
tools like Cruise Control track.

Ismael


Build failed in Jenkins: kafka-trunk-jdk11 #489

2019-05-06 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Remove workarounds for lz4-java bug affecting byte buffers

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H38 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 90043d5f7e1b09a959080c2064f758d5890fc454 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 90043d5f7e1b09a959080c2064f758d5890fc454
Commit message: "MINOR: Remove workarounds for lz4-java bug affecting byte 
buffers (#6679)"
 > git rev-list --no-walk 0c62f5e664e30e44af341cc38605bd5cfb498272 # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins8392606633094019124.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins8392606633094019124.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com
Not sending mail to unregistered user manikumar.re...@gmail.com
Not sending mail to unregistered user wangg...@gmail.com


[VOTE] KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-06 Thread dan
I would like to start voting for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API

thanks
dan


[jira] [Created] (KAFKA-8325) Remove from the incomplete set failed. This should be impossible

2019-05-06 Thread Mattia Barbon (JIRA)
Mattia Barbon created KAFKA-8325:


 Summary: Remove from the incomplete set failed. This should be 
impossible
 Key: KAFKA-8325
 URL: https://issues.apache.org/jira/browse/KAFKA-8325
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.1.0
Reporter: Mattia Barbon


I got this error when using the Kafka producer. So far it happened twice, with 
an interval of about 1 week.


{{ERROR [2019-05-05 08:43:07,505] 
org.apache.kafka.clients.producer.internals.Sender: [Producer 
clientId=, transactionalId=] Uncaught error in kafka 
producer I/O thread:}}
{{ ! java.lang.IllegalStateException: Remove from the incomplete set failed. 
This should be impossible.}}
{{ ! at 
org.apache.kafka.clients.producer.internals.IncompleteBatches.remove(IncompleteBatches.java:44)}}
{{ ! at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.deallocate(RecordAccumulator.java:645)}}
{{ ! at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)}}
{{ ! at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:365)}}
{{ ! at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)}}
{{ ! at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)}}
{{ ! at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk11 #488

2019-05-06 Thread Apache Jenkins Server
See 


Changes:

[manikumar.reddy] KAFKA-7455: Support JmxTool to connect to a secured RMI port. 
(#5968)

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H29 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 0c62f5e664e30e44af341cc38605bd5cfb498272 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 0c62f5e664e30e44af341cc38605bd5cfb498272
Commit message: "KAFKA-7455: Support JmxTool to connect to a secured RMI port. 
(#5968)"
 > git rev-list --no-walk 3322439d9895a7b599104eeb957939a23fef69cf # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins2065348272255371508.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins2065348272255371508.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com
Not sending mail to unregistered user manikumar.re...@gmail.com
Not sending mail to unregistered user wangg...@gmail.com


[jira] [Resolved] (KAFKA-7455) JmxTool cannot connect to an SSL-enabled JMX RMI port

2019-05-06 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7455.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> JmxTool cannot connect to an SSL-enabled JMX RMI port
> -
>
> Key: KAFKA-7455
> URL: https://issues.apache.org/jira/browse/KAFKA-7455
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Attila Sasvari
>Priority: Major
> Fix For: 2.3.0
>
>
> When JmxTool tries to connect to an SSL-enabled JMX RMI port with 
> JMXConnectorFactory'connect(), the connection attempt results in a 
> "java.rmi.ConnectIOException: non-JRMP server at remote endpoint":
> {code}
> $ export 
> KAFKA_OPTS="-Djavax.net.ssl.trustStore=/tmp/kafka.server.truststore.jks 
> -Djavax.net.ssl.trustStorePassword=test"
> $ bin/kafka-run-class.sh kafka.tools.JmxTool --object-name 
> "kafka.server:type=kafka-metrics-count"  --jmx-url 
> service:jmx:rmi:///jndi/rmi://localhost:9393/jmxrmi
> ConnectIOException: non-JRMP server at remote endpoint].
> java.io.IOException: Failed to retrieve RMIServer stub: 
> javax.naming.CommunicationException [Root exception is 
> java.rmi.ConnectIOException: non-JRMP server at remote endpoint]
> at 
> javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
> at 
> javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:270)
> at kafka.tools.JmxTool$.main(JmxTool.scala:120)
> at kafka.tools.JmxTool.main(JmxTool.scala)
> {code}
> The problem is that {{JmxTool}} does not specify 
> {{SslRMIClientSocketFactory}} when it tries to connect
> https://github.com/apache/kafka/blob/70d90c371833b09cf934c8c2358171433892a085/core/src/main/scala/kafka/tools/JmxTool.scala#L120
> {code}  
>   jmxc = JMXConnectorFactory.connect(url, null)
> {code}
> To connect to a secured RMI port, it should pass an envionrment map that 
> contains a {{("com.sun.jndi.rmi.factory.socket", new 
> SslRMIClientSocketFactory)}} entry.
> More info:
> - https://docs.oracle.com/cd/E19698-01/816-7609/security-35/index.html
> - https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-05-06 Thread Viktor Somogyi-Vass
Hi Colin,

Thanks for explaining all this, it makes sense.

Viktor

On Sun, May 5, 2019 at 8:18 AM Colin McCabe  wrote:

> On Thu, May 2, 2019, at 09:35, Viktor Somogyi-Vass wrote:
> > Hey Colin & George,
> >
> > Thinking on George's points I was wondering if it's feasible to submit a
> > big reassignment to the controller and thus Zookeeper as frequent writes
> > are slow as the quorum has to synchronize. Perhaps it should be the
> > responsibility of KIP-435 
> but
> > I'd like to note it here as we're changing the current znode layout in
> this
> > KIP.
>
> Hi Viktor,
>
> This is similar conceptually to if we lose a broker from the cluster.  In
> that case, we have to remove that node from the ISR of all the partitions
> it has, which means updating O(partitions_on_node) znodes.  It's also
> similar to completing a reassignment in the existing Kafka version, and
> updating the partition znodes to reflect new nodes joining the ISR for
> various partitions.  While you are right that ZK is a low-bandwidth system,
> in general writing, to a few thousand ZNodes over the course of a second or
> two is OK.
>
> The existing reassignment znode requires the whole plan to fit within a
> single znode.  The maximum znodes size of 1 megabyte by default, and almost
> nobody reconfigures this.  Assuming about 100 bytes per reassignment, we
> can't get many more than about 10,000 partitions in a reassignment today in
> any case.  The current scalability bottleneck is much more on the side of
> "can kafka actually handle a huge amount of extra traffic due to ongoing
> reassignments"?
>
> That does bring up a good point, though-- we may want to have a "maximum
> concurrent reassignments" to avoid a common scenario that happens now,
> where people accidentally submit a plan that's way too big.  But this is
> not to protect ZooKeeper-- it is to protect the brokers.
>
> > I think ideally we should add these writes in batches to zookeeper and
> > otherwise store it in a replicated internal topic
> > (__partition_reassignments). That would solve the scalability problem as
> > the failover controller would be able to read it up very quickly and also
> > we would spread the writes in Zookeeper over time. Just the current,
> > actively replicated partitions should be present under
> > /brokers/topics/[topic]/partitions/[partitionId]/state, so those
> partitions
> > will know if they have to do reassignment (even in case of a broker
> > bounce). The controller on the other hand could regain its state by
> reading
> > up the last produced message from this __partition_reassignments topic
> and
> > reading up the Zookeeper state to figure out which batch its currently
> > doing (supposing it goes sequentially in the given reassignment).
>
> As I wrote in my reply to the other email, this is not needed because
> we're not adding any controller startup overhead beyond what already
> exists.  We do have some plans to optimize this, but it's outside the scope
> of this KIP.
>
> > I'll think a little bit more about this to fill out any gaps there are
> and
> > perhaps add it to my KIP. That being said probably we'll need to make
> some
> > benchmarking first if this bulk read-write causes a problem at all to
> avoid
> > premature optimisation. I generally don't really worry about reading up
> > this new information as the controller would read up the assignment
> anyway
> > in initializeControllerContext().
>
> Right, the controller will read those znodes on startup anyway.
>
> >
> > A question on SubmitPartitionReassignmentsRequest and its connection with
> > KIP-435 .
> Would
> > the list of topic-partitions have the same ordering on the client side as
> > well as the broker side? I think it would be an advantage as the user
> would
> > know in which order the reassignment would be performed. I think it's
> > useful when it comes to incrementalization as they'd be able to figure
> out
> > what replicas will be in one batch (given they know about the batch
> size).
>
> The big advantage of doing batching on the controller is that the
> controller has more information about what is going on in the cluster.  So
> it can schedule reassignments in a more optimal way.  For instance, it can
> schedule reassignments so that the load is distributed evenly across
> nodes.  This advantage is lost if we have to adhere to a rigid ordering
> that is set up in advance.  We don't know exactly when anything will
> complete in any case.  Just because one partition reassignment was started
> before another doesn't mean it will finish before another.
>
> Additionally, there may be multiple clients submitting assignments and
> multiple clients querying them.  So I don't think ordering makes sense here.
>
> best,
> Colin
>
> >
> > Viktor
> >
> > On Wed, May 1, 2019 at 8:33 AM George Li  .invalid>
> > wrote:
> >
> > >  Hi Colin,
> > >
> > > Thanks for 

Re: [DISCUSS] KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

2019-05-06 Thread Jukka Karvanen
Hi,

Now everything what I planned to add including tests, samples and document
changes are in my branch:
https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics


So I can create PR as soon as this KIP is getting green light to proceed.

Jukka

la 4. toukok. 2019 klo 9.05 Jukka Karvanen (jukka.karva...@jukinimi.com)
kirjoitti:

> Hi,
>
> New TestInputTopic and TestOutputTopic included to Developer guide testing
> page as alternative,
> The old way with ConsumerRecordFactory and OutputVerifier is not removed.
>
> You can see the proposal here in my branch:
>
> https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics
>
>
> I can create Work In progress pull request if that make commenting
> proposal easier.
> Still planning to add full coverage unit test and sample WordCountDemoTest to
> streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount,
> if this KIP is accepted.
>
> Jukka
>
>
> ti 30. huhtik. 2019 klo 13.59 Matthias J. Sax (matth...@confluent.io)
> kirjoitti:
>
>> KIP-451 was discarded in favor this this KIP. So it seems we are all on
>> the same page.
>>
>>
>> >> Relating to KIP-448.
>> >> What kind of alignment did you think about?
>>
>> Nothing in particular. Was more or less a random though. Maybe there is
>> nothing to be aligned. Just wanted to bring it up. :)
>>
>>
>> >> Some thoughts after reading also the comments in KAFKA-6460:
>> >> To my understand these KIP-448 mock classes need to be fed somehow into
>> >> TopologyTestDriver.
>> >> I don't know how those KIP-448 mock are planned to be set to
>> >> TopologyTestDriver.
>>
>> KIP-448 is still quite early, and it's a little unclear... Maybe we
>> should just ignore it for now. Sorry for the distraction with my comment
>> about it.
>>
>>
>> Please give me some more time to review this KIP in detail and I will
>> follow up later again.
>>
>>
>> -Matthias
>>
>> On 4/26/19 2:25 PM, Jukka Karvanen wrote:
>> > Yes, my understanding was also that this KIP cover all the requirement
>> of
>> > KIP-451.
>> >
>> > Relating to KIP-448.
>> > What kind of alignment did you think about?
>> >
>> > Some thoughts after reading also the comments in KAFKA-6460:
>> > To my understand these KIP-448 mock classes need to be fed somehow into
>> > TopologyTestDriver.
>> > I don't know how those KIP-448 mock are planned to be set to
>> > TopologyTestDriver.
>> >
>> > On contrast KIP-456 was planned to be on top of unmodified
>> > TopologyTestDriver and now driver is set to TestInputTopic and
>> > TestOutputTopic in constructor.
>> > There are also alternative that these Topic object could be get from
>> > TopologyTestDriver, but it would require the duplicating the
>> constructors
>> > of Topics as methods to
>> > TopologyTestDriver.
>> >
>> > Also related to those Store object when going through the methods in
>> > TopologyTestDriver I noticed accessing the state stores could be be the
>> > third candidate for helper class or a group of classes.
>> > So addition to have TestInputTopic and TestOutputTopic, it could be also
>> > TestKeyValueStore, TestWindowStore, ... to follow the logic in this
>> > KPI-456, but
>> > it does add not any functionality on top of .already existing
>> functionality
>> > *Store classes. So that's why I did not include those.
>> >
>> > Jukka
>> > -
>> >
>> >
>> >
>> >
>> >
>> > Not
>> >
>> > pe 26. huhtik. 2019 klo 12.03 Matthias J. Sax (matth...@confluent.io)
>> > kirjoitti:
>> >
>> >> Btw: there is also KIP-448. I was wondering if it might be required or
>> >> helpful to align the design of both with each other. Thoughts?
>> >>
>> >>
>> >>
>> >> On 4/25/19 11:22 PM, Matthias J. Sax wrote:
>> >>> Thanks for the KIP!
>> >>>
>> >>> I was just comparing this KIP with KIP-451 (even if I did not yet
>> make a
>> >>> sorrow read over this KIP), and I agree that there is a big overlap.
>> It
>> >>> seems that KIP-456 might subsume KIP-451.
>> >>>
>> >>> Let's wait on Patrick's input to see how to proceed.
>> >>>
>> >>>
>> >>> -Matthias
>> >>>
>> >>> On 4/25/19 12:03 AM, Jukka Karvanen wrote:
>>  Hi,
>> 
>>  If you want to see or test the my current idea of the implementation
>> of
>>  this KIP, you can check it out in my repo:
>> 
>> >>
>> https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics
>> 
>> 
>>  After my test with KPI-451  I do not see need for add methods for
>>  Iterables, but waiting Patrick's clarification of the use case.
>> 
>>  Jukka
>> 
>> 
>>  ti 23. huhtik. 2019 klo 15.37 Jukka Karvanen (
>> >> jukka.karva...@jukinimi.com)
>>  kirjoitti:
>> 
>> > Hi All,
>> >
>> > I would like to start the discussion on KIP-456: Helper classes to
>> >> make it
>> > simpler to write test logic with TopologyTestDriver:
>> >
>> >
>> >
>> >>
>> 

Build failed in Jenkins: kafka-trunk-jdk11 #487

2019-05-06 Thread Apache Jenkins Server
See 


Changes:

[manikumar.reddy] MINOR: Fix ThrottledReplicaListValidator doc error. (#6537)

[manikumar.reddy] MINOR: Document improvement (#6682)

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 3322439d9895a7b599104eeb957939a23fef69cf 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 3322439d9895a7b599104eeb957939a23fef69cf
Commit message: "MINOR: Document improvement (#6682)"
 > git rev-list --no-walk 56b92a550454765b09047be9ad561f3395c614e4 # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins6621665161257423451.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins6621665161257423451.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com
Not sending mail to unregistered user manikumar.re...@gmail.com
Not sending mail to unregistered user wangg...@gmail.com


Re: [VOTE] KIP-334 Include partitions in exceptions raised during consumer record deserialization/validation

2019-05-06 Thread Stanislav Kozlovski
Hey there Kamal,

I'm sincerely sorry for missing your earlier message. As I open this thread
up, I see I have an unsent draft message about resuming discussion from
some time ago.

In retrospect, I think I may have been too pedantic with the exception
naming and hierarchy.
I now believe a single exception type of `RecordDeserializationException`
is enough. Let's go with that.

On Mon, May 6, 2019 at 6:40 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Matthias,
>
> We already have CorruptRecordException which doesn't extend the
> SerializationException. So, we need an alternate
> name suggestion for the corrupted record error if we decide to extend the
> FaultyRecordException class.
>
> Stanislav,
>
> Our users are also facing this error. Could we bump up this discussion?
>
> I think we can have a single exception type
> FaultyRecordException/RecordDeserialization exception to capture both
> the errors. We can add an additional enum field to differentiate the errors
> if required.
>
> Thanks,
> Kamal Chandraprakash
>
> On Wed, Apr 24, 2019 at 1:49 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Stanislav,
> >
> > Any updates on this KIP? We have internal users who want to skip the
> > corrupted message while consuming the records.
> >
> >
> > On Fri, Oct 19, 2018 at 11:34 PM Matthias J. Sax 
> > wrote:
> >
> >> I am not 100% familiar with the details of the consumer code, however I
> >> tend to disagree with:
> >>
> >> > There's no difference between the two cases -- if (and only if) the
> >> message is corrupt, it can't be deserialized.  If (and only if) it
> can't be
> >> deserialized, it is corrupt.
> >>
> >> Assume that a user configures a JSON deserializer but a faulty upstream
> >> producer writes an Avro message. For this case, the message is not
> >> corrupted, but still can't be deserialized. And I can imaging that users
> >> want to handle both cases differently.
> >>
> >> Thus, I think it makes sense to have two different exceptions
> >> `RecordDeserializationException` and `CorruptedRecordException` that can
> >> both extend `FaultyRecordException` (don't like this name too much
> >> honestly, but don't have a better idea for it anyway).
> >>
> >> Side remark. If we introduce class `RecordDeserializationException` and
> >> `CorruptedRecordException`, we can also add an interface that both
> >> implement to return partition/offset information and let both extend
> >> `SerializationException` directly without an intermediate class in the
> >> exception hierarchy.
> >>
> >>
> >> -Matthias
> >>
> >> On 8/8/18 2:57 AM, Stanislav Kozlovski wrote:
> >> >> If you are inheriting from SerializationException, your derived class
> >> > should also be a kind of serialization exception.  Not something more
> >> > general.
> >> > Yeah, the reason for inheriting it would be for
> backwards-compatibility.
> >> >
> >> >> Hmm.  Can you think of any new scenarios that would make Kafka force
> >> the
> >> > user need to skip a specific record?  Perhaps one scenario is if
> records
> >> > are lost but we don't know how many.
> >> > Not on the spot, but I do wonder how likely a new scenario is to
> >> surface in
> >> > the future and how we'd handle the exceptions' class hierarchy then.
> >> >
> >> >> Which offset were we planning to use in the
> >> > exception?
> >> > The offset of the record which caused the exception. In the case of
> >> > batches, we use the last offset of the batch. In both cases, users
> >> should
> >> > have to seek +1 from the given offset. You can review the PR to ensure
> >> its
> >> > accurate
> >> >
> >> >
> >> > If both of you prefer `RecordDeserializationException`, we can go with
> >> > that. Please do confirm that is okay
> >> >
> >> > On Tue, Aug 7, 2018 at 11:35 PM Jason Gustafson 
> >> wrote:
> >> >
> >> >> One difference between the two cases is that we can't generally trust
> >> the
> >> >> offset of a corrupt message. Which offset were we planning to use in
> >> the
> >> >> exception? Maybe it should be either the fetch offset or one plus the
> >> last
> >> >> consumed offset? I think I'm with Colin in preferring
> >> >> RecordDeserializationException for both cases if possible. For one
> >> thing,
> >> >> that makes the behavior consistent whether or not `check.crs` is
> >> enabled.
> >> >>
> >> >> -Jason
> >> >>
> >> >> On Tue, Aug 7, 2018 at 11:17 AM, Colin McCabe 
> >> wrote:
> >> >>
> >> >>> Hi Stanislav,
> >> >>>
> >> >>> On Sat, Aug 4, 2018, at 10:44, Stanislav Kozlovski wrote:
> >>  Hey Colin,
> >> 
> >>  It may be a bit vague but keep in mind we also raise the exception
> in
> >> >> the
> >>  case where the record is corrupted.
> >>  We discussed with Jason offline that message corruption most often
> >> >>> prevents
> >>  deserialization itself and that may be enough of an argument to
> raise
> >>  `RecordDeserializationException` in the case of a corrupt record. I
> >>  personally find that