Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Boyang Chen
Thanks Bill!

Get Outlook for iOS

From: Bill Bejeck 
Sent: Tuesday, April 30, 2019 6:44:17 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

Thanks for the KIP Boyang.  I have no additional comments from the ones
already presented.

+1(binding)

-Bill

On Tue, Apr 30, 2019 at 4:35 PM Boyang Chen  wrote:

> Thank you Guozhang!
>
> 
> From: Guozhang Wang 
> Sent: Wednesday, May 1, 2019 3:54 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
>
> +1 (binding)
>
> Guozhang
>
> On 2019/04/26 07:42:12, "Matthias J. Sax"  wrote:
> > Thanks for the KIP!
> >
> > I agree that the change makes sense, and not only for the static group
> > membership case.
> >
> > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > new one (for example to recover failed threads), while the JVM is still
> > running, it is more intuitive that the thread names are number from 1 to
> > X again, and not from X+1 to 2*x on restart.
> >
> > Also, the original idea about making thread names unique across
> > application is non-intuitive itself. It might make sense if there are
> > two instances of the same application within one JVM -- however, this
> > seems to be a rather rare case. Also, the only pattern for this use case
> > seems to by dynamic scaling, and I believe we should actually void this
> > pattern by adding a `stopThread()` and `addThread()` method to
> > `KafkaStreams` directly.
> >
> >
> > -Matthias
> >
> >
> > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > Hey friends,
> > >
> > > I would like to start discussion for a very small KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > >
> > > it is trying to avoid sharing thread-id increment between multiple
> stream instances configured in one JVM. This is an important fix for static
> membership<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>
> to be effective for KStreams in edge case like changing `group.instance.id`
> throughout restarts due to thread-id interleaving.
> > >
> > > I will open the vote thread in the main while, since this is a very
> small fix. Feel free to continue the discussion on this thread, thank you!
> > >
> > > Boyang
> > >
> >
> >
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-30 Thread Paul Whalen
I definitely don't mind anyone jumping, Bruno, thanks for the comments!

1) I'm not totally sure I'm clear on your point, but I think we're on the
same page - if we're adding a method to the XSupplier interfaces (by making
them inherit from a super interface StateStoreConnector) then we definitely
need a default implementation to maintain compatibility.  Whether the
default implementation returns null or an empty list is somewhat of a
detail.

2) If stream.process() sees that StateStoreConnector#stateStores() returns
either null or an empty list, it would handle that case specifically and
not try to call addStateStore at all.  Or is this not what you're asking?

Separately, I'm still hacking away at the details of the PR and will
continue to get something into a discussable state, but I'll share some
thoughts I've run into.

A) I'm tentatively going the separate interface route (Matthias's
suggestion) and naming it ConnectedStoreProvider.  Still don't love the
name, but there's something nice about the name indicating *why* this thing
is providing the store, not just that it is providing it.

B) It has occurred to me that topology.addProcessor() could also recognize
if ProcessorSupplier implements ConnectedStoreProvider and add and connect
stores appropriately.  This isn't in the KIP and I think the value-add is
lower (if you're reaching that low level, surely the "auto add/connect
store" isn't too important to you), but I think it would be a confusing if
it didn't, and I don't see any real downside.

Paul

On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna  wrote:

> Hi,
>
> @Paul: Thank you for the KIP!
>
> I hope you do not mind that I jump in.
>
> I have the following comments:
>
> 1) `null` vs empty list in the default implementation
> IIUC, returning `null` in the default implementation should basically
> signal that the method `stateStores` was not overridden. Why then provide a
> default implementation in the first place? Without default implementation
> you would discover the missing implementation already at compile-time and
> not only at runtime. If you decide not to provide a default implementation,
> `XSupplier extends StateStoreConnector` would break existing code as
> Matthias has already pointed out.
>
> 2) `process` method adding the StoreBuilders to the topology
> If the default implementation returned `null` and `XSupplier extends
> StateStoreConnector`, then existing code would break, because
> `StreamsBuilder#addStateStore()` would throw a NPE.
>
> +1 for opening a WIP PR
>
> Best,
> Bruno
>
>
> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
> wrote:
>
> > Thank Paul!
> >
> > I agree with all of that. If we think that the general design is good,
> > refactoring a PR if we want to pick a different name should not be too
> > much additional work (hopefully). Thus, if you want to open a WIP PR and
> > we use it to nail the open details, it might help to find a good
> > conclusion.
> >
> > >> 2) Default method vs new interface:
> >
> > This seems to be the hardest tradeoff. I see the point about
> > discoveability... Might be good to get input from others, which version
> > they would prefer.
> >
> > Just to make clear, my suggestion from the last email was, that
> > `Transformer` etc does not extend the new interface. Instead, a user
> > that want to use this feature would need to implement both interfaces.
> >
> > If `Transformer extends StoreProvider` (just picking a name here)
> > without default implementation existing code would break and thus it not
> > a an option because of breaking backward compatibility.
> >
> >
> > -Matthias
> >
> > On 4/28/19 8:37 PM, Paul Whalen wrote:
> > > Great thoughts Matthias, thanks! I think we're all agreed that naming
> and
> > > documentation/education are the biggest hurdles for this KIP, and in
> > light
> > > of that, I think it makes sense for me to just take a stab at a full
> > > fledged PR with documentation to convince us that it's possible to do
> it
> > > with enough clarity.
> > >
> > > In response to your specific thoughts:
> > >
> > > 1) StateStoreConnector as a name: Really good point about defining the
> > > difference between "adding" and "connecting."  Guozhang suggested
> > > StateStoreConnector which was definitely an improvement over my
> > > StateStoresSupplier, but I think you're right that we need to be
> careful
> > to
> > > make it clear that it's really accomplishing both.  Thinking about it
> > now,
> > > one problem with Connector is that the implementer of the interface is
> > not
> > > really doing any connecting, it's providing/supplying the store that
> will
> > > be both added and connected.  StoreProvider seems reasonable to me and
> > > probably the best candidate at the moment, but it would be nice if the
> > name
> > > could convey that it's providing the store specifically so the caller
> can
> > > add it to the topology and connect it to the associated transformer.
> > >
> > > In general I think that really 

[jira] [Created] (KAFKA-8308) Update jetty for security vulnerability CVE-2019-10241

2019-04-30 Thread Di Shang (JIRA)
Di Shang created KAFKA-8308:
---

 Summary: Update jetty for security vulnerability CVE-2019-10241
 Key: KAFKA-8308
 URL: https://issues.apache.org/jira/browse/KAFKA-8308
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 2.2.0
Reporter: Di Shang


Kafka 2.2 uses jetty-*-9.4.14.v20181114 which is marked vulnerable

[https://github.com/apache/kafka/blob/2.2/gradle/dependencies.gradle#L58]

 

[https://nvd.nist.gov/vuln/detail/CVE-2019-10241]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOT] KIP-449: Add connector contexts to Connect worker logs (vote thread)

2019-04-30 Thread Chris Egerton
+1 (non-binding)

Really looking forward to this. Thanks, Randall!

On Tue, Apr 30, 2019, 20:47 Magesh Nandakumar  wrote:

> This will make connect debugging so much easier. Thanks a lot for driving
> this Randall.
>
> +1 (non-binding)
>
> Thanks,
> Magesh
>
> On Tue, Apr 30, 2019 at 7:19 PM Jeremy Custenborder <
> jcustenbor...@gmail.com>
> wrote:
>
> > +1 non binding
> >
> > On Mon, Apr 29, 2019 at 5:34 PM Randall Hauch  wrote:
> > >
> > > I would like to start the vote for KIP-258:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
> > >
> > > The KIP uses the Mapped Diagnostic Contexts (MDC) feature of SLF4J API
> to
> > > add more context to log messages from within Connect workers and
> > connector
> > > implementations. This would not be enabled by default, though it would
> be
> > > easy to enable within the Connect Log4J configuration.
> > >
> > > Thanks!
> > >
> > > Randall
> >
>


Re: [VOT] KIP-449: Add connector contexts to Connect worker logs (vote thread)

2019-04-30 Thread Magesh Nandakumar
This will make connect debugging so much easier. Thanks a lot for driving
this Randall.

+1 (non-binding)

Thanks,
Magesh

On Tue, Apr 30, 2019 at 7:19 PM Jeremy Custenborder 
wrote:

> +1 non binding
>
> On Mon, Apr 29, 2019 at 5:34 PM Randall Hauch  wrote:
> >
> > I would like to start the vote for KIP-258:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
> >
> > The KIP uses the Mapped Diagnostic Contexts (MDC) feature of SLF4J API to
> > add more context to log messages from within Connect workers and
> connector
> > implementations. This would not be enabled by default, though it would be
> > easy to enable within the Connect Log4J configuration.
> >
> > Thanks!
> >
> > Randall
>


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-30 Thread Magesh Nandakumar
Randall,

The approach to return the to override configs could possibly make it
cumbersome to implement a custom policy. This is a new configuration and if
you don't explicitly set it the existing behavior remains as-is. Like
Chris, I also preferred this approach for the sake of simplicity.  If not
for the default `null` I would prefer to fall back to using `Ignore` which
is a misnomer to the interface spec but still gets the job done via
instanceOf checks. The other options I could think of are as below:-

   - have an enforcePolicy() method in the interface which by default
   returns true and the Ignore implementation could return false
   - introduce another worker config allow.connector.config.overrides with
   a default value of false and the default policy can be None

Let me know what you think.

Thanks
Magesh

On Tue, Apr 30, 2019 at 6:52 PM Randall Hauch  wrote:

> Thanks, Chris. I still think it's strange to have a non-policy, since
> there's now special behavior for when the policy is not specified.
>
> Perhaps the inability for a policy implementation to represent the existing
> behavior suggests that the policy interface isn't quite right. Could the
> policy's "validate" method take the overrides that were supplied and return
> the overrides that should be passed to the connector, yet still throwing an
> exception if any supplied overrides are not allowed. Then the different
> policy implementations might be:
>
>- Ignore (default) - returns all supplied override properties
>- None - throws exception if any override properties are supplied;
>always returns empty map if no overrides are provided
>- Principal - throws exception if other override properties are
>provided, but returns an empty map (since no properties should be
> passed to
>the connector)
>- All - returns all provided override properties
>
> All override properties defined on the connector configuration would be
> passed to the policy for validation, and assuming there's no error all of
> these overrides would be used in the producer/consumer/admin client. The
> result of the policy call, however, is used to determine which of these
> overrides are passed to the connector.
>
> This approach means that all behaviors can be implemented through a policy
> class, including the defaults. It also gives a bit more control to custom
> policies, should that be warranted. For example, validating the provided
> client overrides but passing all such override properties to the connector,
> which as I stated earlier is something I think connectors likely don't look
> for.
>
> Thoughts?
>
> Randall
>
> On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton  wrote:
>
> > Randall,
> >
> > The special behavior for null was my suggestion. There is no
> implementation
> > of the proposed interface that causes client overrides to be ignored, so
> > the original idea was to have a special implementation that would be
> > checked for by the Connect framework (probably via the instanceof
> operator)
> > and, if present, cause all would-be overrides to be ignored.
> >
> > I thought this may be confusing to people who may see that behavior and
> > wonder how to recreate it themselves, so I suggested leaving that policy
> > out and replace it with a check to see if a policy was specified at all.
> >
> > Would be interested in your thoughts on this, especially if there's an
> > alternative that hasn't been proposed yet.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Apr 30, 2019, 18:01 Randall Hauch  wrote:
> >
> > > On Tue, Apr 30, 2019 at 4:20 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Randall,
> > > >
> > > > Thanks a lot for your feedback.
> > > >
> > > > You bring up an interesting point regarding the overrides being
> > available
> > > > to the connectors. Today everything that is specified in the config
> > while
> > > > creating is available for the connector. But this is a specific case
> > and
> > > we
> > > > could do either of the following
> > > >
> > > >
> > > >- don't pass any configs with these prefixes to the
> ConnectorConfig
> > > >instance that's passed in the startConnector
> > > >- allow policies as to whether the configurations with the
> prefixes
> > > >should be made available to the connector or not. Should this also
> > > > define a
> > > >list of configurations?
> > > >
> > > > I personally prefer not passing the configs to Connector since that's
> > > > simple, straight forward and don't see a reason for the connector to
> > > access
> > > > those.
> > > >
> > >
> > > I agree that these override properties should be effectively new
> > > properties, in which case I'd also prefer that they be removed from the
> > > configuration before it is passed to the connector. Yes, it is
> *possible*
> > > that an existing connector happened to use connector config properties
> > with
> > > these prefixes, but it's seems pretty unlikely.
> > >
> > > I'd love to hear 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Paul Whalen
Also +1 on the issues/goals as Michael outlined them, I think that sets a
great framework for the discussion.

Regarding the SortedMap solution, my understanding is that the current
proposal in the KIP is what is in my PR which (pending naming decisions) is
roughly this:

stream.split()
.branch(Predicate, Consumer>)
.branch(Predicate, Consumer>)
.defaultBranch(Consumer>);

Obviously some ordering is necessary, since branching as a construct
doesn't work without it, but this solution seems like it provides as much
associativity as the SortedMap solution, because each branch() call
directly associates the "conditional" with the "code block."  The value it
provides over the KIP solution is the accessing of streams in the same
scope.

The KIP solution is less "dynamic" than the SortedMap solution in the sense
that it is slightly clumsier to add a dynamic number of branches, but it is
certainly possible.  It seems to me like the API should favor the "static"
case anyway, and should make it simple and readable to fluently declare and
access your branches in-line.  It also makes it impossible to ignore a
branch, and it is possible to build an (almost) identical SortedMap
solution on top of it.

I could also see a middle ground where instead of a raw SortedMap being
taken in, branch() takes a name and not a Consumer.  Something like this:

Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch", Consumer>);

Pros for that solution:
 - accessing branched KStreams in same scope
 - no double brace initialization, hopefully slightly more readable than
SortedMap

Cons
 - downstream branch logic cannot be specified inline which makes it harder
to read top to bottom (like existing API and SortedMap, but unlike the KIP)
 - you can forget to "handle" one of the branched streams (like existing
API and SortedMap, but unlike the KIP)

(KBranchedStreams could even work *both* ways but perhaps that's overdoing
it).

Overall I'm curious how important it is to be able to easily access the
branched KStream in the same scope as the original.  It's possible that it
doesn't need to be handled directly by the API, but instead left up to the
user.  I'm sort of in the middle on it.

Paul



On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
wrote:

> I'd like to +1 what Michael said about the issues with the existing branch
> method, I agree with what he's outlined and I think we should proceed by
> trying to alleviate these problems. Specifically it seems important to be
> able to cleanly access the individual branches (eg by mapping
> name->stream), which I thought was the original intention of this KIP.
>
> That said, I don't think we should so easily give in to the double brace
> anti-pattern or force ours users into it if at all possible to avoid...just
> my two cents.
>
> Cheers,
> Sophie
>
> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> michael.droga...@confluent.io> wrote:
>
> > I’d like to propose a different way of thinking about this. To me, there
> > are three problems with the existing branch signature:
> >
> > 1. If you use it the way most people do, Java raises unsafe type
> warnings.
> > 2. The way in which you use the stream branches is positionally coupled
> to
> > the ordering of the conditionals.
> > 3. It is brittle to extend existing branch calls with additional code
> > paths.
> >
> > Using associative constructs instead of relying on ordered constructs
> would
> > be a stronger approach. Consider a signature that instead looks like
> this:
> >
> > Map> KStream#branch(SortedMap > super K,? super V>>);
> >
> > Branches are given names in a map, and as a result, the API returns a
> > mapping of names to streams. The ordering of the conditionals is
> maintained
> > because it’s a sorted map. Insert order determines the order of
> evaluation.
> >
> > This solves problem 1 because there are no more varargs. It solves
> problem
> > 2 because you no longer lean on ordering to access the branch you’re
> > interested in. It solves problem 3 because you can introduce another
> > conditional by simply attaching another name to the structure, rather
> than
> > messing with the existing indices.
> >
> > One of the drawbacks is that creating the map inline is historically
> > awkward in Java. I know it’s an anti-pattern to use voluminously, but
> > double brace initialization would clean up the aesthetics.
> >
> > On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:
> >
> > > Hi Ivan,
> > >
> > > Thanks for the update.
> > >
> > > FWIW, I agree with Matthias that the current "start branching" operator
> > is
> > > confusing when named the same way as the actual branches. "Split" seems
> > > like a good name. Alternatively, we can do without a "start branching"
> > > operator at all, and just do:
> > >
> > > stream
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > 

Re: [VOT] KIP-449: Add connector contexts to Connect worker logs (vote thread)

2019-04-30 Thread Jeremy Custenborder
+1 non binding

On Mon, Apr 29, 2019 at 5:34 PM Randall Hauch  wrote:
>
> I would like to start the vote for KIP-258:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
>
> The KIP uses the Mapped Diagnostic Contexts (MDC) feature of SLF4J API to
> add more context to log messages from within Connect workers and connector
> implementations. This would not be enabled by default, though it would be
> easy to enable within the Connect Log4J configuration.
>
> Thanks!
>
> Randall


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-30 Thread Randall Hauch
Thanks, Chris. I still think it's strange to have a non-policy, since
there's now special behavior for when the policy is not specified.

Perhaps the inability for a policy implementation to represent the existing
behavior suggests that the policy interface isn't quite right. Could the
policy's "validate" method take the overrides that were supplied and return
the overrides that should be passed to the connector, yet still throwing an
exception if any supplied overrides are not allowed. Then the different
policy implementations might be:

   - Ignore (default) - returns all supplied override properties
   - None - throws exception if any override properties are supplied;
   always returns empty map if no overrides are provided
   - Principal - throws exception if other override properties are
   provided, but returns an empty map (since no properties should be passed to
   the connector)
   - All - returns all provided override properties

All override properties defined on the connector configuration would be
passed to the policy for validation, and assuming there's no error all of
these overrides would be used in the producer/consumer/admin client. The
result of the policy call, however, is used to determine which of these
overrides are passed to the connector.

This approach means that all behaviors can be implemented through a policy
class, including the defaults. It also gives a bit more control to custom
policies, should that be warranted. For example, validating the provided
client overrides but passing all such override properties to the connector,
which as I stated earlier is something I think connectors likely don't look
for.

Thoughts?

Randall

On Tue, Apr 30, 2019 at 6:07 PM Chris Egerton  wrote:

> Randall,
>
> The special behavior for null was my suggestion. There is no implementation
> of the proposed interface that causes client overrides to be ignored, so
> the original idea was to have a special implementation that would be
> checked for by the Connect framework (probably via the instanceof operator)
> and, if present, cause all would-be overrides to be ignored.
>
> I thought this may be confusing to people who may see that behavior and
> wonder how to recreate it themselves, so I suggested leaving that policy
> out and replace it with a check to see if a policy was specified at all.
>
> Would be interested in your thoughts on this, especially if there's an
> alternative that hasn't been proposed yet.
>
> Cheers,
>
> Chris
>
> On Tue, Apr 30, 2019, 18:01 Randall Hauch  wrote:
>
> > On Tue, Apr 30, 2019 at 4:20 PM Magesh Nandakumar 
> > wrote:
> >
> > > Randall,
> > >
> > > Thanks a lot for your feedback.
> > >
> > > You bring up an interesting point regarding the overrides being
> available
> > > to the connectors. Today everything that is specified in the config
> while
> > > creating is available for the connector. But this is a specific case
> and
> > we
> > > could do either of the following
> > >
> > >
> > >- don't pass any configs with these prefixes to the ConnectorConfig
> > >instance that's passed in the startConnector
> > >- allow policies as to whether the configurations with the prefixes
> > >should be made available to the connector or not. Should this also
> > > define a
> > >list of configurations?
> > >
> > > I personally prefer not passing the configs to Connector since that's
> > > simple, straight forward and don't see a reason for the connector to
> > access
> > > those.
> > >
> >
> > I agree that these override properties should be effectively new
> > properties, in which case I'd also prefer that they be removed from the
> > configuration before it is passed to the connector. Yes, it is *possible*
> > that an existing connector happened to use connector config properties
> with
> > these prefixes, but it's seems pretty unlikely.
> >
> > I'd love to hear whether other people agree.
> >
> >
> > >
> > > For the second point,  None - doesn't allow overrides and the default
> > > policy is null. We preserve backward compatibility when no policy is
> > > configured. Let me know if that's not clear in the KIP.
> > >
> >
> > Why not have a default policy (rather than null) that implements the
> > backward-compatible behavior? It seems strange to have null be the
> default
> > and for non-policy to allow anything.
> >
> >
> > >
> > > Thanks,
> > > Magesh
> > >
> > > On Mon, Apr 29, 2019 at 4:07 PM Randall Hauch 
> wrote:
> > >
> > > > Per the proposal, a connector configuration can define one or more
> > > > properties that begin with any of the three prefixes:
> > > "producer.override.",
> > > > "consumer.override.", and "admin.override.". The proposal states:
> > > >
> > > > Since the users can specify any of these policies, the connectors
> > itself
> > > > should not rely on these configurations to be available. The
> overrides
> > > are
> > > > to be used purely from an operational perspective.
> > > >
> > > >
> > > > Does this mean that any such 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Sophie Blee-Goldman
I'd like to +1 what Michael said about the issues with the existing branch
method, I agree with what he's outlined and I think we should proceed by
trying to alleviate these problems. Specifically it seems important to be
able to cleanly access the individual branches (eg by mapping
name->stream), which I thought was the original intention of this KIP.

That said, I don't think we should so easily give in to the double brace
anti-pattern or force ours users into it if at all possible to avoid...just
my two cents.

Cheers,
Sophie

On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
michael.droga...@confluent.io> wrote:

> I’d like to propose a different way of thinking about this. To me, there
> are three problems with the existing branch signature:
>
> 1. If you use it the way most people do, Java raises unsafe type warnings.
> 2. The way in which you use the stream branches is positionally coupled to
> the ordering of the conditionals.
> 3. It is brittle to extend existing branch calls with additional code
> paths.
>
> Using associative constructs instead of relying on ordered constructs would
> be a stronger approach. Consider a signature that instead looks like this:
>
> Map> KStream#branch(SortedMap super K,? super V>>);
>
> Branches are given names in a map, and as a result, the API returns a
> mapping of names to streams. The ordering of the conditionals is maintained
> because it’s a sorted map. Insert order determines the order of evaluation.
>
> This solves problem 1 because there are no more varargs. It solves problem
> 2 because you no longer lean on ordering to access the branch you’re
> interested in. It solves problem 3 because you can introduce another
> conditional by simply attaching another name to the structure, rather than
> messing with the existing indices.
>
> One of the drawbacks is that creating the map inline is historically
> awkward in Java. I know it’s an anti-pattern to use voluminously, but
> double brace initialization would clean up the aesthetics.
>
> On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:
>
> > Hi Ivan,
> >
> > Thanks for the update.
> >
> > FWIW, I agree with Matthias that the current "start branching" operator
> is
> > confusing when named the same way as the actual branches. "Split" seems
> > like a good name. Alternatively, we can do without a "start branching"
> > operator at all, and just do:
> >
> > stream
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > Tentatively, I think that this branching operation should be terminal.
> That
> > way, we don't create ambiguity about how to use it. That is, `branch`
> > should return `KBranchedStream`, while `defaultBranch` is `void`, to
> > enforce that it comes last, and that there is only one definition of the
> > default branch. Potentially, we should log a warning if there's no
> default,
> > and additionally log a warning (or throw an exception) if a record falls
> > though with no default.
> >
> > Thoughts?
> >
> > Thanks,
> > -John
> >
> > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for updating the KIP and your answers.
> > >
> > >
> > > >  this is to make the name similar to String#split
> > > >> that also returns an array, right?
> > >
> > > The intend was to avoid name duplication. The return type should _not_
> > > be an array.
> > >
> > > The current proposal is
> > >
> > > stream.branch()
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > IMHO, this reads a little odd, because the first `branch()` does not
> > > take any parameters and has different semantics than the later
> > > `branch()` calls. Note, that from the code snippet above, it's hidden
> > > that the first call is `KStream#branch()` while the others are
> > > `KBranchedStream#branch()` what makes reading the code harder.
> > >
> > > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > > might be better to also rename `KStream#branch()` to avoid the naming
> > > overlap that seems to be confusing. The following reads much cleaner to
> > me:
> > >
> > > stream.split()
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > Maybe there is a better alternative to `split()` though to avoid the
> > > naming overlap.
> > >
> > >
> > > > 'default' is, however, a reserved word, so unfortunately we cannot
> have
> > > a method with such name :-)
> > >
> > > Bummer. Didn't consider this. Maybe we can still come up with a short
> > name?
> > >
> > >
> > > Can you add the interface `KBranchedStream` to the KIP with all it's
> > > methods? It will be part of public API and should be contained in the
> > > KIP. For example, it's unclear atm, what the return type of
> > > `defaultBranch()` is.
> > >
> > >
> > > You did not comment on the idea to add a `KBranchedStream#get(int
> index)
> > > -> KStream` method to get the individually branched-KStreams. 

Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Bill Bejeck
Thanks for the KIP Boyang.  I have no additional comments from the ones
already presented.

+1(binding)

-Bill

On Tue, Apr 30, 2019 at 4:35 PM Boyang Chen  wrote:

> Thank you Guozhang!
>
> 
> From: Guozhang Wang 
> Sent: Wednesday, May 1, 2019 3:54 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
>
> +1 (binding)
>
> Guozhang
>
> On 2019/04/26 07:42:12, "Matthias J. Sax"  wrote:
> > Thanks for the KIP!
> >
> > I agree that the change makes sense, and not only for the static group
> > membership case.
> >
> > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > new one (for example to recover failed threads), while the JVM is still
> > running, it is more intuitive that the thread names are number from 1 to
> > X again, and not from X+1 to 2*x on restart.
> >
> > Also, the original idea about making thread names unique across
> > application is non-intuitive itself. It might make sense if there are
> > two instances of the same application within one JVM -- however, this
> > seems to be a rather rare case. Also, the only pattern for this use case
> > seems to by dynamic scaling, and I believe we should actually void this
> > pattern by adding a `stopThread()` and `addThread()` method to
> > `KafkaStreams` directly.
> >
> >
> > -Matthias
> >
> >
> > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > Hey friends,
> > >
> > > I would like to start discussion for a very small KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > >
> > > it is trying to avoid sharing thread-id increment between multiple
> stream instances configured in one JVM. This is an important fix for static
> membership<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>
> to be effective for KStreams in edge case like changing `group.instance.id`
> throughout restarts due to thread-id interleaving.
> > >
> > > I will open the vote thread in the main while, since this is a very
> small fix. Feel free to continue the discussion on this thread, thank you!
> > >
> > > Boyang
> > >
> >
> >
>


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-30 Thread Chris Egerton
Randall,

The special behavior for null was my suggestion. There is no implementation
of the proposed interface that causes client overrides to be ignored, so
the original idea was to have a special implementation that would be
checked for by the Connect framework (probably via the instanceof operator)
and, if present, cause all would-be overrides to be ignored.

I thought this may be confusing to people who may see that behavior and
wonder how to recreate it themselves, so I suggested leaving that policy
out and replace it with a check to see if a policy was specified at all.

Would be interested in your thoughts on this, especially if there's an
alternative that hasn't been proposed yet.

Cheers,

Chris

On Tue, Apr 30, 2019, 18:01 Randall Hauch  wrote:

> On Tue, Apr 30, 2019 at 4:20 PM Magesh Nandakumar 
> wrote:
>
> > Randall,
> >
> > Thanks a lot for your feedback.
> >
> > You bring up an interesting point regarding the overrides being available
> > to the connectors. Today everything that is specified in the config while
> > creating is available for the connector. But this is a specific case and
> we
> > could do either of the following
> >
> >
> >- don't pass any configs with these prefixes to the ConnectorConfig
> >instance that's passed in the startConnector
> >- allow policies as to whether the configurations with the prefixes
> >should be made available to the connector or not. Should this also
> > define a
> >list of configurations?
> >
> > I personally prefer not passing the configs to Connector since that's
> > simple, straight forward and don't see a reason for the connector to
> access
> > those.
> >
>
> I agree that these override properties should be effectively new
> properties, in which case I'd also prefer that they be removed from the
> configuration before it is passed to the connector. Yes, it is *possible*
> that an existing connector happened to use connector config properties with
> these prefixes, but it's seems pretty unlikely.
>
> I'd love to hear whether other people agree.
>
>
> >
> > For the second point,  None - doesn't allow overrides and the default
> > policy is null. We preserve backward compatibility when no policy is
> > configured. Let me know if that's not clear in the KIP.
> >
>
> Why not have a default policy (rather than null) that implements the
> backward-compatible behavior? It seems strange to have null be the default
> and for non-policy to allow anything.
>
>
> >
> > Thanks,
> > Magesh
> >
> > On Mon, Apr 29, 2019 at 4:07 PM Randall Hauch  wrote:
> >
> > > Per the proposal, a connector configuration can define one or more
> > > properties that begin with any of the three prefixes:
> > "producer.override.",
> > > "consumer.override.", and "admin.override.". The proposal states:
> > >
> > > Since the users can specify any of these policies, the connectors
> itself
> > > should not rely on these configurations to be available. The overrides
> > are
> > > to be used purely from an operational perspective.
> > >
> > >
> > > Does this mean that any such properties are visible to connectors, or
> > will
> > > they be hidden to connectors? Currently no connectors have access to
> such
> > > client properties, and users are unlike to just put them into a
> connector
> > > configuration unnecessarily. A connector implementation could have
> > defined
> > > such properties as normal connector-specific properties, in which case
> > they
> > > are required, but is that likely given the log prefixes? One concern
> > that I
> > > have is that this might allow connector implementations start
> attempting
> > to
> > > circumvent the Connect API if these properties are included.
> > >
> > > Second, does the None policy allow but ignore these additional
> properties
> > > (e.g., "validate(...)" is simply a no-op)? Or does the None policy fail
> > if
> > > any client overrides are specified? The former seems more in line with
> > the
> > > current behavior, whereas the "disallows" policy seems useful but not
> > > exactly backward compatible. Should we also offer a "Disallow" policy?
> In
> > > fact, should the policies be named "Ignore" (default), "Disallow",
> > > "Prinicipal", and "All"?
> > >
> > > Otherwise, I like the idea of this. There have been several requests
> over
> > > the past year or two for adding subsets of this functionality. Might be
> > > good to find and list all of the related KAFKA issues.
> > >
> > > Randall
> > >
> > > On Fri, Apr 26, 2019 at 4:04 PM Chris Egerton 
> > wrote:
> > >
> > > > Hi Magesh,
> > > >
> > > > Changes look good to me! Excited to see this happen, hope the KIP
> > passes
> > > :)
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Fri, Apr 26, 2019 at 1:44 PM Magesh Nandakumar <
> > mage...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > I have updated the KIP to reflect the changes that we discussed for
> > the
> > > > > prefix. Thanks for all your inputs.
> > > > 

Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-30 Thread Randall Hauch
On Tue, Apr 30, 2019 at 4:20 PM Magesh Nandakumar 
wrote:

> Randall,
>
> Thanks a lot for your feedback.
>
> You bring up an interesting point regarding the overrides being available
> to the connectors. Today everything that is specified in the config while
> creating is available for the connector. But this is a specific case and we
> could do either of the following
>
>
>- don't pass any configs with these prefixes to the ConnectorConfig
>instance that's passed in the startConnector
>- allow policies as to whether the configurations with the prefixes
>should be made available to the connector or not. Should this also
> define a
>list of configurations?
>
> I personally prefer not passing the configs to Connector since that's
> simple, straight forward and don't see a reason for the connector to access
> those.
>

I agree that these override properties should be effectively new
properties, in which case I'd also prefer that they be removed from the
configuration before it is passed to the connector. Yes, it is *possible*
that an existing connector happened to use connector config properties with
these prefixes, but it's seems pretty unlikely.

I'd love to hear whether other people agree.


>
> For the second point,  None - doesn't allow overrides and the default
> policy is null. We preserve backward compatibility when no policy is
> configured. Let me know if that's not clear in the KIP.
>

Why not have a default policy (rather than null) that implements the
backward-compatible behavior? It seems strange to have null be the default
and for non-policy to allow anything.


>
> Thanks,
> Magesh
>
> On Mon, Apr 29, 2019 at 4:07 PM Randall Hauch  wrote:
>
> > Per the proposal, a connector configuration can define one or more
> > properties that begin with any of the three prefixes:
> "producer.override.",
> > "consumer.override.", and "admin.override.". The proposal states:
> >
> > Since the users can specify any of these policies, the connectors itself
> > should not rely on these configurations to be available. The overrides
> are
> > to be used purely from an operational perspective.
> >
> >
> > Does this mean that any such properties are visible to connectors, or
> will
> > they be hidden to connectors? Currently no connectors have access to such
> > client properties, and users are unlike to just put them into a connector
> > configuration unnecessarily. A connector implementation could have
> defined
> > such properties as normal connector-specific properties, in which case
> they
> > are required, but is that likely given the log prefixes? One concern
> that I
> > have is that this might allow connector implementations start attempting
> to
> > circumvent the Connect API if these properties are included.
> >
> > Second, does the None policy allow but ignore these additional properties
> > (e.g., "validate(...)" is simply a no-op)? Or does the None policy fail
> if
> > any client overrides are specified? The former seems more in line with
> the
> > current behavior, whereas the "disallows" policy seems useful but not
> > exactly backward compatible. Should we also offer a "Disallow" policy? In
> > fact, should the policies be named "Ignore" (default), "Disallow",
> > "Prinicipal", and "All"?
> >
> > Otherwise, I like the idea of this. There have been several requests over
> > the past year or two for adding subsets of this functionality. Might be
> > good to find and list all of the related KAFKA issues.
> >
> > Randall
> >
> > On Fri, Apr 26, 2019 at 4:04 PM Chris Egerton 
> wrote:
> >
> > > Hi Magesh,
> > >
> > > Changes look good to me! Excited to see this happen, hope the KIP
> passes
> > :)
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Fri, Apr 26, 2019 at 1:44 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > I have updated the KIP to reflect the changes that we discussed for
> the
> > > > prefix. Thanks for all your inputs.
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > > On Thu, Apr 25, 2019 at 2:18 PM Chris Egerton 
> > > wrote:
> > > >
> > > > > Hi Magesh,
> > > > >
> > > > > Agreed that we should avoid `dlq.admin`. I also don't have a strong
> > > > opinion
> > > > > between `connector.` and `.override`, but I have a slight
> inclination
> > > > > toward `.override` since `connector.` feels a little redundant
> given
> > > that
> > > > > the whole configuration is for the connector and the use of
> > "override"
> > > > may
> > > > > shed a little light on how the properties for these clients are
> > > computed
> > > > > and help make the learning curve a little gentler on new devs and
> > > users.
> > > > >
> > > > > Regardless, I think the larger issue of conflicts with existing
> > > > properties
> > > > > (both in MM2 and potentially other connectors) has been
> > satisfactorily
> > > > > addressed, so I'm happy.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Wed, Apr 24, 

Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-04-30 Thread Randall Hauch
Nice simple improvement. Thanks, Paul!

+1 (binding)

Randall

On Mon, Apr 29, 2019 at 5:06 PM Magesh Nandakumar 
wrote:

> Looks good to me and a very useful feature.
>
> +1 ( non-binding)
>
> On Mon, Apr 29, 2019, 4:05 PM Arjun Satish  wrote:
>
> > Thanks, Paul! This is very useful.
> >
> > +1 (non-binding)
> >
> > Best,
> > Arjun
> >
> > On Fri, Apr 12, 2019 at 4:13 PM Ryanne Dolan 
> > wrote:
> >
> > > +1 (non binding)
> > >
> > > Thanks
> > > Ryanne
> > >
> > > On Fri, Apr 12, 2019, 11:11 AM Paul Davidson
> > >  wrote:
> > >
> > > > Just a reminder that KIP-411 is open for voting. No votes received
> yet!
> > > >
> > > > On Fri, Apr 5, 2019 at 9:07 AM Paul Davidson <
> pdavid...@salesforce.com
> > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Since we seem to have agreement in the discussion I would like to
> > start
> > > > > the vote on KIP-411.
> > > > >
> > > > > See:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > > >
> > > > > Also see the related PR: https://github.com/apache/kafka/pull/6097
> > > > >
> > > > > Thanks to everyone who contributed!
> > > > >
> > > > > Paul
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-30 Thread Jose Armando Garcia Sancio
On Tue, Apr 30, 2019 at 11:39 AM Jason Gustafson  wrote:

> Thanks for the updates, Jose. The proposal looks good to me. Just one minor
> question I had is whether we should even have a default --election-type in
> kafka-leader-election.sh. I am wondering if it is reasonable to make the
> user be explicit about what they are trying to do?
>

This change sounds good to me. We can always add a default to
--election-type in the future without breaking backwards compatibility.

Thanks!


Re: [DISCUSS] KIP-458: Connector Client Config Override Policy

2019-04-30 Thread Magesh Nandakumar
Randall,

Thanks a lot for your feedback.

You bring up an interesting point regarding the overrides being available
to the connectors. Today everything that is specified in the config while
creating is available for the connector. But this is a specific case and we
could do either of the following


   - don't pass any configs with these prefixes to the ConnectorConfig
   instance that's passed in the startConnector
   - allow policies as to whether the configurations with the prefixes
   should be made available to the connector or not. Should this also define a
   list of configurations?

I personally prefer not passing the configs to Connector since that's
simple, straight forward and don't see a reason for the connector to access
those.

For the second point,  None - doesn't allow overrides and the default
policy is null. We preserve backward compatibility when no policy is
configured. Let me know if that's not clear in the KIP.

Thanks,
Magesh

On Mon, Apr 29, 2019 at 4:07 PM Randall Hauch  wrote:

> Per the proposal, a connector configuration can define one or more
> properties that begin with any of the three prefixes: "producer.override.",
> "consumer.override.", and "admin.override.". The proposal states:
>
> Since the users can specify any of these policies, the connectors itself
> should not rely on these configurations to be available. The overrides are
> to be used purely from an operational perspective.
>
>
> Does this mean that any such properties are visible to connectors, or will
> they be hidden to connectors? Currently no connectors have access to such
> client properties, and users are unlike to just put them into a connector
> configuration unnecessarily. A connector implementation could have defined
> such properties as normal connector-specific properties, in which case they
> are required, but is that likely given the log prefixes? One concern that I
> have is that this might allow connector implementations start attempting to
> circumvent the Connect API if these properties are included.
>
> Second, does the None policy allow but ignore these additional properties
> (e.g., "validate(...)" is simply a no-op)? Or does the None policy fail if
> any client overrides are specified? The former seems more in line with the
> current behavior, whereas the "disallows" policy seems useful but not
> exactly backward compatible. Should we also offer a "Disallow" policy? In
> fact, should the policies be named "Ignore" (default), "Disallow",
> "Prinicipal", and "All"?
>
> Otherwise, I like the idea of this. There have been several requests over
> the past year or two for adding subsets of this functionality. Might be
> good to find and list all of the related KAFKA issues.
>
> Randall
>
> On Fri, Apr 26, 2019 at 4:04 PM Chris Egerton  wrote:
>
> > Hi Magesh,
> >
> > Changes look good to me! Excited to see this happen, hope the KIP passes
> :)
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Apr 26, 2019 at 1:44 PM Magesh Nandakumar 
> > wrote:
> >
> > > Hi Chris,
> > >
> > > I have updated the KIP to reflect the changes that we discussed for the
> > > prefix. Thanks for all your inputs.
> > >
> > > Thanks,
> > > Magesh
> > >
> > > On Thu, Apr 25, 2019 at 2:18 PM Chris Egerton 
> > wrote:
> > >
> > > > Hi Magesh,
> > > >
> > > > Agreed that we should avoid `dlq.admin`. I also don't have a strong
> > > opinion
> > > > between `connector.` and `.override`, but I have a slight inclination
> > > > toward `.override` since `connector.` feels a little redundant given
> > that
> > > > the whole configuration is for the connector and the use of
> "override"
> > > may
> > > > shed a little light on how the properties for these clients are
> > computed
> > > > and help make the learning curve a little gentler on new devs and
> > users.
> > > >
> > > > Regardless, I think the larger issue of conflicts with existing
> > > properties
> > > > (both in MM2 and potentially other connectors) has been
> satisfactorily
> > > > addressed, so I'm happy.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Wed, Apr 24, 2019 at 11:14 AM Magesh Nandakumar <
> > mage...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > > > HI Chrise,
> > > > >
> > > > > You are right about the "admin." prefix creating conflicts. Here
> are
> > > few
> > > > > options that I can think of
> > > > >
> > > > > 1. Use `dlq.admin` since admin client is used only for DLQ. But
> this
> > > > might
> > > > > not really be the case in the future. So, we should possibly drop
> > this
> > > > idea
> > > > > :)
> > > > > 2.  Use `connector.producer`, `connector.consumer` and
> > > `connector.admin`
> > > > -
> > > > > provides better context that its connector specific property
> > > > > 3.  Use `producer.override`, '`consumer.override` and
> > `admin.override`
> > > -
> > > > > provides better clarity that these are overrides.
> > > > >
> > > > > I don't have a strong opinion in choosing between #2 and #3. Let me
> > > > > know what you 

[jira] [Created] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2019-04-30 Thread John Roesler (JIRA)
John Roesler created KAFKA-8307:
---

 Summary: Kafka Streams should provide some mechanism to determine 
topology equality and compatibility
 Key: KAFKA-8307
 URL: https://issues.apache.org/jira/browse/KAFKA-8307
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Streams provides no mechanism to compare two topologies. This is a 
common operation when users want to have tests verifying that they don't 
accidentally alter their topology. They would save the known-good topology and 
then add a unit test verifying the current code against that known-good state.

However, because there's no way to do this comparison properly, everyone is 
reduced to using the string format of the topology (from 
`Topology#describe().toString()`). The major drawback is that the string format 
is meant for human consumption. It is neither machine-parseable nor stable. So, 
these compatibility tests are doomed to fail when any minor, non-breaking, 
change is made either to the application, or to the library. This trains 
everyone to update the test whenever it fails, undermining its utility.

We should fix this problem, and provide both a mechanism to serialize the 
topology and to compare two topologies for compatibility. All in all, I think 
we need:
# a way to serialize/deserialize topology structure in a machine-parseable 
format that is future-compatible. Offhand, I'd recommend serializing the 
topology structure as JSON, and establishing a policy that attributes should 
only be added to the object graph, never removed. Note, it's out of scope to be 
able to actually run a deserialized topology; we only want to save and load the 
structure (not the logic) to facilitate comparisons.
# a method to verify the *equality* of two topologies... This method tells you 
that the two topologies are structurally identical. We can't know if the logic 
of any operator has changed, only if the structure of the graph is changed. We 
can consider whether other graph properties, like serdes, should be included.
# a method to verify the *compatibility* of two topologies... This method tells 
you that moving from topology A to topology B does not require an application 
reset. Note that this operation is not commutative: `A.compatibleWith(B)` does 
not imply `B.compatibleWith(A)`. We can discuss whether `A.compatibleWith(B) && 
B.compatibleWith(A)` implies `A.equals(B)` (I think not necessarily, because we 
may want "equality" to be stricter than "compatibility").




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.1-jdk8 #176

2019-04-30 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Boyang Chen
Thank you Guozhang!


From: Guozhang Wang 
Sent: Wednesday, May 1, 2019 3:54 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

+1 (binding)

Guozhang

On 2019/04/26 07:42:12, "Matthias J. Sax"  wrote:
> Thanks for the KIP!
>
> I agree that the change makes sense, and not only for the static group
> membership case.
>
> For example, if a user `closes()` a `KafkaStreams` client and creates a
> new one (for example to recover failed threads), while the JVM is still
> running, it is more intuitive that the thread names are number from 1 to
> X again, and not from X+1 to 2*x on restart.
>
> Also, the original idea about making thread names unique across
> application is non-intuitive itself. It might make sense if there are
> two instances of the same application within one JVM -- however, this
> seems to be a rather rare case. Also, the only pattern for this use case
> seems to by dynamic scaling, and I believe we should actually void this
> pattern by adding a `stopThread()` and `addThread()` method to
> `KafkaStreams` directly.
>
>
> -Matthias
>
>
> On 4/25/19 11:13 PM, Boyang Chen wrote:
> > Hey friends,
> >
> > I would like to start discussion for a very small KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> >
> > it is trying to avoid sharing thread-id increment between multiple stream 
> > instances configured in one JVM. This is an important fix for static 
> > membership
> >  to be effective for KStreams in edge case like changing 
> > `group.instance.id` throughout restarts due to thread-id interleaving.
> >
> > I will open the vote thread in the main while, since this is a very small 
> > fix. Feel free to continue the discussion on this thread, thank you!
> >
> > Boyang
> >
>
>


Build failed in Jenkins: kafka-2.2-jdk8 #96

2019-04-30 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] KAFKA-8134: `linger.ms` must be a long

--
[...truncated 2.50 MB...]
kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testReassignPartitionLeaderElectionWithEmptyIsr PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive 
PASSED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
STARTED

kafka.controller.PartitionLeaderElectionAlgorithmsTest > 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled 
PASSED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNonexistentPartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOfflineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
STARTED

kafka.controller.PartitionStateMachineTest > testUpdatingOfflinePartitionsCount 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNonexistentPartitionToOfflinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates 
PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidNewPartitionToNonexistentPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testNewPartitionToOnlinePartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition STARTED

kafka.controller.PartitionStateMachineTest > 
testInvalidOnlinePartitionToNewPartitionTransition PASSED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion STARTED

kafka.controller.PartitionStateMachineTest > 
testUpdatingOfflinePartitionsCountDuringTopicDeletion PASSED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup STARTED

kafka.controller.PartitionStateMachineTest > 
testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup PASSED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown STARTED

kafka.controller.PartitionStateMachineTest > 
testOnlinePartitionToOnlineTransitionForControlledShutdown PASSED

kafka.controller.PartitionStateMachineTest > 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Michael Drogalis
I’d like to propose a different way of thinking about this. To me, there
are three problems with the existing branch signature:

1. If you use it the way most people do, Java raises unsafe type warnings.
2. The way in which you use the stream branches is positionally coupled to
the ordering of the conditionals.
3. It is brittle to extend existing branch calls with additional code paths.

Using associative constructs instead of relying on ordered constructs would
be a stronger approach. Consider a signature that instead looks like this:

Map> KStream#branch(SortedMap>);

Branches are given names in a map, and as a result, the API returns a
mapping of names to streams. The ordering of the conditionals is maintained
because it’s a sorted map. Insert order determines the order of evaluation.

This solves problem 1 because there are no more varargs. It solves problem
2 because you no longer lean on ordering to access the branch you’re
interested in. It solves problem 3 because you can introduce another
conditional by simply attaching another name to the structure, rather than
messing with the existing indices.

One of the drawbacks is that creating the map inline is historically
awkward in Java. I know it’s an anti-pattern to use voluminously, but
double brace initialization would clean up the aesthetics.

On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:

> Hi Ivan,
>
> Thanks for the update.
>
> FWIW, I agree with Matthias that the current "start branching" operator is
> confusing when named the same way as the actual branches. "Split" seems
> like a good name. Alternatively, we can do without a "start branching"
> operator at all, and just do:
>
> stream
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> Tentatively, I think that this branching operation should be terminal. That
> way, we don't create ambiguity about how to use it. That is, `branch`
> should return `KBranchedStream`, while `defaultBranch` is `void`, to
> enforce that it comes last, and that there is only one definition of the
> default branch. Potentially, we should log a warning if there's no default,
> and additionally log a warning (or throw an exception) if a record falls
> though with no default.
>
> Thoughts?
>
> Thanks,
> -John
>
> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
> wrote:
>
> > Thanks for updating the KIP and your answers.
> >
> >
> > >  this is to make the name similar to String#split
> > >> that also returns an array, right?
> >
> > The intend was to avoid name duplication. The return type should _not_
> > be an array.
> >
> > The current proposal is
> >
> > stream.branch()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > IMHO, this reads a little odd, because the first `branch()` does not
> > take any parameters and has different semantics than the later
> > `branch()` calls. Note, that from the code snippet above, it's hidden
> > that the first call is `KStream#branch()` while the others are
> > `KBranchedStream#branch()` what makes reading the code harder.
> >
> > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > might be better to also rename `KStream#branch()` to avoid the naming
> > overlap that seems to be confusing. The following reads much cleaner to
> me:
> >
> > stream.split()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > Maybe there is a better alternative to `split()` though to avoid the
> > naming overlap.
> >
> >
> > > 'default' is, however, a reserved word, so unfortunately we cannot have
> > a method with such name :-)
> >
> > Bummer. Didn't consider this. Maybe we can still come up with a short
> name?
> >
> >
> > Can you add the interface `KBranchedStream` to the KIP with all it's
> > methods? It will be part of public API and should be contained in the
> > KIP. For example, it's unclear atm, what the return type of
> > `defaultBranch()` is.
> >
> >
> > You did not comment on the idea to add a `KBranchedStream#get(int index)
> > -> KStream` method to get the individually branched-KStreams. Would be
> > nice to get your feedback about it. It seems you suggest that users
> > would need to write custom utility code otherwise, to access them. We
> > should discuss the pros and cons of both approaches. It feels
> > "incomplete" to me atm, if the API has no built-in support to get the
> > branched-KStreams directly.
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > Hi all!
> > >
> > > I have updated the KIP-418 according to the new vision.
> > >
> > > Matthias, thanks for your comment!
> > >
> > >> Renaming KStream#branch() -> #split()
> > >
> > > I can see your point: this is to make the name similar to String#split
> > > that also returns an array, right? But is it worth the loss of
> backwards
> > > compatibility? We can have overloaded branch() as well without
> affecting
> > > the existing code. Maybe the old 

Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Guozhang Wang
+1 (binding)

Guozhang

On 2019/04/26 07:42:12, "Matthias J. Sax"  wrote: 
> Thanks for the KIP!
> 
> I agree that the change makes sense, and not only for the static group
> membership case.
> 
> For example, if a user `closes()` a `KafkaStreams` client and creates a
> new one (for example to recover failed threads), while the JVM is still
> running, it is more intuitive that the thread names are number from 1 to
> X again, and not from X+1 to 2*x on restart.
> 
> Also, the original idea about making thread names unique across
> application is non-intuitive itself. It might make sense if there are
> two instances of the same application within one JVM -- however, this
> seems to be a rather rare case. Also, the only pattern for this use case
> seems to by dynamic scaling, and I believe we should actually void this
> pattern by adding a `stopThread()` and `addThread()` method to
> `KafkaStreams` directly.
> 
> 
> -Matthias
> 
> 
> On 4/25/19 11:13 PM, Boyang Chen wrote:
> > Hey friends,
> > 
> > I would like to start discussion for a very small KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > 
> > it is trying to avoid sharing thread-id increment between multiple stream 
> > instances configured in one JVM. This is an important fix for static 
> > membership
> >  to be effective for KStreams in edge case like changing 
> > `group.instance.id` throughout restarts due to thread-id interleaving.
> > 
> > I will open the vote thread in the main while, since this is a very small 
> > fix. Feel free to continue the discussion on this thread, thank you!
> > 
> > Boyang
> > 
> 
> 


Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Boyang Chen
Thank you Sophie! Added the case Matthias described in the Compatibility 
session.


From: Sophie Blee-Goldman 
Sent: Wednesday, May 1, 2019 1:30 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

Hey Boyang,

I think this sounds great but one thing you might want to update is the
"Compatibility, Deprecation, and Migration Plan" -- I agree having two
instances in the same JVM is probably a rare occurrence but the (presumably
less rare) situation Matthias described would also be affected in case of
exposed thread ids. Just a small note

Sophie

On Tue, Apr 30, 2019 at 8:25 AM Boyang Chen  wrote:

> Hey Bruno,
>
> "throttling purpose" means that we use `client.id` to track the request
> quota and do the throttling based on that. So in that context, it is
> expected to use same `client.id` for a certain set of consumers.
>
> Also thank you Guozhang for the comment! Merged with the KIP.
>
> Boyang
> 
> From: Bruno Cadonna 
> Sent: Tuesday, April 30, 2019 4:15 PM
> To: dev@kafka.apache.org
> Subject: Re: Fw: [DISCUSS] KIP-462 : Use local thread id for KStreams
>
> Hi Guozhang,
>
> What do you mean exactly with "throttling purposes"?
>
> @Boyang: Thank you for the KIP!
>
> Best,
> Bruno
>
> On Tue, Apr 30, 2019 at 1:15 AM Guozhang Wang  wrote:
>
> > Hi Boyang,
> >
> > Thanks for the KIP. I think it makes sense.
> >
> > Just following up on the documentation part: since we are effectively
> > removing this guard against same client.ids of instances --- and btw,
> > semantically we would not forbid users to set the same client.ids anyways
> > for throttling purposes for example --- it's worth augmenting the
> > client.id
> > config description by stating what users should expect client.id to be
> > propagated to internal embedded clients, and therefore what's the
> expected
> > outcome if they choose to set same client.ids for different Streams
> client.
> >
> >
> > Otherwise, I've no further comments.
> >
> > Guozhang
> >
> > On Mon, Apr 29, 2019 at 3:42 PM Boyang Chen  wrote:
> >
> > > FYI
> > >
> > >
> > > 
> > > From: Boyang Chen 
> > > Sent: Tuesday, April 30, 2019 4:32 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Could we get more discussions on this thread?
> > >
> > > Boyang
> > >
> > > 
> > > From: Boyang Chen 
> > > Sent: Friday, April 26, 2019 10:51 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Thanks for the explanation Matthias! Will enhance the KIP motivation by
> > > your example.
> > >
> > >
> > > 
> > > From: Matthias J. Sax 
> > > Sent: Friday, April 26, 2019 3:42 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Thanks for the KIP!
> > >
> > > I agree that the change makes sense, and not only for the static group
> > > membership case.
> > >
> > > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > > new one (for example to recover failed threads), while the JVM is still
> > > running, it is more intuitive that the thread names are number from 1
> to
> > > X again, and not from X+1 to 2*x on restart.
> > >
> > > Also, the original idea about making thread names unique across
> > > application is non-intuitive itself. It might make sense if there are
> > > two instances of the same application within one JVM -- however, this
> > > seems to be a rather rare case. Also, the only pattern for this use
> case
> > > seems to by dynamic scaling, and I believe we should actually void this
> > > pattern by adding a `stopThread()` and `addThread()` method to
> > > `KafkaStreams` directly.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > > Hey friends,
> > > >
> > > > I would like to start discussion for a very small KIP:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > > >
> > > > it is trying to avoid sharing thread-id increment between multiple
> > > stream instances configured in one JVM. This is an important fix for
> > static
> > > membership<
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > >
> > > to be effective for KStreams in edge case like changing `
> > group.instance.id`
> > > throughout restarts due to thread-id interleaving.
> > > >
> > > > I will open the vote thread in the main while, since this is a very
> > > small fix. Feel free to continue the discussion on this thread, thank
> > you!
> > > >
> > > > Boyang
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-454: Expansion of the ConnectClusterState interface

2019-04-30 Thread Chris Egerton
Hi Konstantine,

I've updated the KIP to add default method implementations to the list of
rejected alternatives. Technically this makes the changes in the KIP
backwards incompatible, but I think I agree that for the majority of cases
where it would even be an issue a compile-time error is likely to be more
beneficial than one at run time.

Thanks for your thoughts and thanks for the LGTM!

Cheers,

Chris

On Mon, Apr 29, 2019 at 12:29 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Chris,
>
> Thanks for considering my suggestion regarding default implementations for
> the new methods.
> However, given that these implementations don't seem to have sane defaults
> and throw UnsupportedOperationException, I think we'll be better without
> defaults.
> Seems that a compile time error is preferable here, for those who want to
> upgrade their implementations.
>
> Otherwise, the KIP LGTM.
>
> Konstantine
>
> On Thu, Apr 25, 2019 at 10:29 PM Magesh Nandakumar 
> wrote:
>
> > Thanks a lot, Chris. The KIP looks good to me.
> >
> > On Thu, Apr 25, 2019 at 7:35 PM Chris Egerton 
> wrote:
> >
> > > Hi Magesh,
> > >
> > > Sounds good; I've updated the KIP to make ConnectClusterDetails an
> > > interface. If we want to leave the door open to expand it in the future
> > it
> > > definitely makes sense to treat it similarly to how we're treating the
> > > ConnectClusterState interface now.
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> > > On Thu, Apr 25, 2019 at 4:18 PM Magesh Nandakumar <
> mage...@confluent.io>
> > > wrote:
> > >
> > > > HI Chrise,
> > > >
> > > > Overall it looks good to me. Just one last comment - I was wondering
> if
> > > > ConnectClusterDetail should be an interface just like
> > > ConnectClusterState.
> > > >
> > > > Thanks,
> > > > Magesh
> > > >
> > > > On Thu, Apr 25, 2019 at 3:54 PM Chris Egerton 
> > > wrote:
> > > >
> > > > > Hi Magesh,
> > > > >
> > > > > Expanding the type we use to convey cluster metadata from just a
> > Kafka
> > > > > cluster ID string to its own class seems like a good idea for the
> > sake
> > > of
> > > > > forwards compatibility, but I'm still not sure what the gains of
> > > > including
> > > > > the cluster group ID would be--it's a simple map lookup away in the
> > > REST
> > > > > extension's configure(...) method. Including information on whether
> > the
> > > > > cluster is distributed or standalone definitely seems convenient;
> as
> > > far
> > > > as
> > > > > I can tell there's no easy way to do that from within a REST
> > extension
> > > at
> > > > > the moment, and relying on something like the presence of a
> group.id
> > > > > property to identify a distributed cluster could result in false
> > > > positives.
> > > > > However, is there a use case for it? If not, we can note that as a
> > > > possible
> > > > > addition to the ConnectClusterDetails class for later but leave it
> > out
> > > > for
> > > > > now.
> > > > >
> > > > > I've updated the KIP to include the new ConnectClusterDetails class
> > but
> > > > > left out the cluster type information for now; let me know what you
> > > > think.
> > > > >
> > > > > Thanks again for your thoughts!
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Wed, Apr 24, 2019 at 4:49 PM Magesh Nandakumar <
> > > mage...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Chris,
> > > > > >
> > > > > > Instead of calling it ConnectClusterId, perhaps call it
> > > > > > ConnectClusterDetails which can include things like groupid,
> > > underlying
> > > > > > kafkaclusterId, standalone or distributed, etc. This will help
> > expose
> > > > any
> > > > > > cluster related information in the future.
> > > > > > Let me know if that would work?
> > > > > >
> > > > > > Thanks,
> > > > > > Magesh
> > > > > >
> > > > > > On Wed, Apr 24, 2019 at 4:26 PM Chris Egerton <
> chr...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Magesh,
> > > > > > >
> > > > > > > 1. After ruminating for a little while on the inclusion of a
> > method
> > > > to
> > > > > > > retrieve task configurations I've tentatively decided to remove
> > it
> > > > from
> > > > > > the
> > > > > > > proposal and place it in the rejected alternatives section. If
> > > anyone
> > > > > > > presents a reasonable use case for it I'll be happy to discuss
> > > > further
> > > > > > but
> > > > > > > right now I think this is the way to go. Thanks for your
> > > suggestion!
> > > > > > >
> > > > > > > 2. The idea of a Connect cluster ID method is certainly
> > > fascinating,
> > > > > but
> > > > > > > there are a few questions it raises. First off, what would the
> > > > > group.id
> > > > > > be
> > > > > > > for a standalone cluster? Second, why return a formatted string
> > > there
> > > > > > > instead of a new class such as a ConnectClusterId that provides
> > the
> > > > two
> > > > > > in
> > > > > > > separate methods? And lastly, since REST extensions are
> > configured
> > > > with
> > 

Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-04-30 Thread Almog Gavra
"lack of default doesn't force people to think - it gets them to pick a
random number" - I suppose if I'm being pragmatic this rings very true to
my experience as well... better a cluster administrator sets a sane default
than having users choose random numbers!

Consider me convinced - I'll update the KIP to reflect this: if replicas,
partitions and manual assignments are all missing, the cluster defaults
will be used.

On Tue, Apr 30, 2019 at 11:36 AM Gwen Shapira  wrote:

> Changing number of partitions is complicated in some use-cases and easy in
> other cases (when you use Kafka as a big pipe of events on the way to
> another system).
> I like making easy things easy and complex things complicated. Having
> defaults for both will allow the easy cases to be even easier.
>
> In my experience lack of default doesn't force people to think - it gets
> them to pick a random number... (not sure there is any configuration that
> can get anyone to think, unfortunately).
>
> On Tue, Apr 30, 2019 at 10:22 AM Almog Gavra  wrote:
>
> > I have a preference toward requiring specifying partitions per topic, but
> > I'm happy to be convinced otherwise. Changing replication factor after
> the
> > fact is easy, but changing partitions is complicated since historical
> state
> > gets messed up, so it could be beneficial to force clients to think about
> > it up front. Furthermore, I see partitioning as a function on the scale
> of
> > the data while replication is a function of cluster capacity. Thoughts?
> >
> > On Tue, Apr 30, 2019 at 8:58 AM Ismael Juma  wrote:
> >
> > > Thanks for the KIP, Almog. This is a good change. I think we should
> also
> > > allow the partition count broker default to be used (the one used for
> > auto
> > > topic creation).
> > >
> > > Ismael
> > >
> > > On Tue, Apr 30, 2019, 8:39 AM Almog Gavra  wrote:
> > >
> > > > Hello Everyone,
> > > >
> > > > I'd like to start a discussion on KIP-464:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > > >
> > > > It's about allowing users of the AdminClient to supply only a
> partition
> > > > count and to use the default replication factor configured by the
> kafka
> > > > cluster. Happy to receive any and all feedback!
> > > >
> > > > Cheers,
> > > > Almog
> > > >
> > >
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


Re: [DISCUSS] KIP-460: Admin Leader Election RPC

2019-04-30 Thread Jason Gustafson
Thanks for the updates, Jose. The proposal looks good to me. Just one minor
question I had is whether we should even have a default --election-type in
kafka-leader-election.sh. I am wondering if it is reasonable to make the
user be explicit about what they are trying to do?

-Jason

On Fri, Apr 26, 2019 at 2:39 PM Jose Armando Garcia Sancio <
jsan...@confluent.io> wrote:

> Hi all,
>
> Jason, Colin and I discuss this KIP offline and decided to make the
> following changes.
>
>1. Change the ElectLeadersRequest RPC so that only one election type can
>be specified and it applies to all of the topic partitions enumerated.
> We
>think that this makes the API easier to use when performing one type of
>election across multiple topic partitions. We think that it is rare that
>they user would like to perform different type of elections in the same
>command (or request).
>2. Change the kafka-leader-election script so that it doesn't default to
>applying the election type to all of the topic partitions. For example
>previously "bin/kafka-preferred-replica-election.sh --bootstrap-server
>$host:$port" would attempt to perform preferred leader election to all
> of
>the partition. Instead now the user needs to run the following command
>"bin/kafka-leader-election.sh --bootstrap-server $host:@port
>--all-topic-partitions"
>
> The KIP has been updated to includes these changes. The diff is here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=113707931=13=12
>
> Thanks!
>
> On Wed, Apr 24, 2019 at 3:45 PM Jose Armando Garcia Sancio <
> jsan...@confluent.io> wrote:
>
> > Hi all,
> >
> > We would like to extend the "preferred leader election" RPC for the admin
> > client to also support unclean leader elections.
> >
> > The KIP can be found here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC
> >
> > Thanks!
> > -Jose
> >
>
>
> --
> -Jose
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-04-30 Thread Gwen Shapira
Changing number of partitions is complicated in some use-cases and easy in
other cases (when you use Kafka as a big pipe of events on the way to
another system).
I like making easy things easy and complex things complicated. Having
defaults for both will allow the easy cases to be even easier.

In my experience lack of default doesn't force people to think - it gets
them to pick a random number... (not sure there is any configuration that
can get anyone to think, unfortunately).

On Tue, Apr 30, 2019 at 10:22 AM Almog Gavra  wrote:

> I have a preference toward requiring specifying partitions per topic, but
> I'm happy to be convinced otherwise. Changing replication factor after the
> fact is easy, but changing partitions is complicated since historical state
> gets messed up, so it could be beneficial to force clients to think about
> it up front. Furthermore, I see partitioning as a function on the scale of
> the data while replication is a function of cluster capacity. Thoughts?
>
> On Tue, Apr 30, 2019 at 8:58 AM Ismael Juma  wrote:
>
> > Thanks for the KIP, Almog. This is a good change. I think we should also
> > allow the partition count broker default to be used (the one used for
> auto
> > topic creation).
> >
> > Ismael
> >
> > On Tue, Apr 30, 2019, 8:39 AM Almog Gavra  wrote:
> >
> > > Hello Everyone,
> > >
> > > I'd like to start a discussion on KIP-464:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > >
> > > It's about allowing users of the AdminClient to supply only a partition
> > > count and to use the default replication factor configured by the kafka
> > > cluster. Happy to receive any and all feedback!
> > >
> > > Cheers,
> > > Almog
> > >
> >
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Sophie Blee-Goldman
Hey Boyang,

I think this sounds great but one thing you might want to update is the
"Compatibility, Deprecation, and Migration Plan" -- I agree having two
instances in the same JVM is probably a rare occurrence but the (presumably
less rare) situation Matthias described would also be affected in case of
exposed thread ids. Just a small note

Sophie

On Tue, Apr 30, 2019 at 8:25 AM Boyang Chen  wrote:

> Hey Bruno,
>
> "throttling purpose" means that we use `client.id` to track the request
> quota and do the throttling based on that. So in that context, it is
> expected to use same `client.id` for a certain set of consumers.
>
> Also thank you Guozhang for the comment! Merged with the KIP.
>
> Boyang
> 
> From: Bruno Cadonna 
> Sent: Tuesday, April 30, 2019 4:15 PM
> To: dev@kafka.apache.org
> Subject: Re: Fw: [DISCUSS] KIP-462 : Use local thread id for KStreams
>
> Hi Guozhang,
>
> What do you mean exactly with "throttling purposes"?
>
> @Boyang: Thank you for the KIP!
>
> Best,
> Bruno
>
> On Tue, Apr 30, 2019 at 1:15 AM Guozhang Wang  wrote:
>
> > Hi Boyang,
> >
> > Thanks for the KIP. I think it makes sense.
> >
> > Just following up on the documentation part: since we are effectively
> > removing this guard against same client.ids of instances --- and btw,
> > semantically we would not forbid users to set the same client.ids anyways
> > for throttling purposes for example --- it's worth augmenting the
> > client.id
> > config description by stating what users should expect client.id to be
> > propagated to internal embedded clients, and therefore what's the
> expected
> > outcome if they choose to set same client.ids for different Streams
> client.
> >
> >
> > Otherwise, I've no further comments.
> >
> > Guozhang
> >
> > On Mon, Apr 29, 2019 at 3:42 PM Boyang Chen  wrote:
> >
> > > FYI
> > >
> > >
> > > 
> > > From: Boyang Chen 
> > > Sent: Tuesday, April 30, 2019 4:32 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Could we get more discussions on this thread?
> > >
> > > Boyang
> > >
> > > 
> > > From: Boyang Chen 
> > > Sent: Friday, April 26, 2019 10:51 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Thanks for the explanation Matthias! Will enhance the KIP motivation by
> > > your example.
> > >
> > >
> > > 
> > > From: Matthias J. Sax 
> > > Sent: Friday, April 26, 2019 3:42 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> > >
> > > Thanks for the KIP!
> > >
> > > I agree that the change makes sense, and not only for the static group
> > > membership case.
> > >
> > > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > > new one (for example to recover failed threads), while the JVM is still
> > > running, it is more intuitive that the thread names are number from 1
> to
> > > X again, and not from X+1 to 2*x on restart.
> > >
> > > Also, the original idea about making thread names unique across
> > > application is non-intuitive itself. It might make sense if there are
> > > two instances of the same application within one JVM -- however, this
> > > seems to be a rather rare case. Also, the only pattern for this use
> case
> > > seems to by dynamic scaling, and I believe we should actually void this
> > > pattern by adding a `stopThread()` and `addThread()` method to
> > > `KafkaStreams` directly.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > > Hey friends,
> > > >
> > > > I would like to start discussion for a very small KIP:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > > >
> > > > it is trying to avoid sharing thread-id increment between multiple
> > > stream instances configured in one JVM. This is an important fix for
> > static
> > > membership<
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > >
> > > to be effective for KStreams in edge case like changing `
> > group.instance.id`
> > > throughout restarts due to thread-id interleaving.
> > > >
> > > > I will open the vote thread in the main while, since this is a very
> > > small fix. Feel free to continue the discussion on this thread, thank
> > you!
> > > >
> > > > Boyang
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-04-30 Thread Almog Gavra
I have a preference toward requiring specifying partitions per topic, but
I'm happy to be convinced otherwise. Changing replication factor after the
fact is easy, but changing partitions is complicated since historical state
gets messed up, so it could be beneficial to force clients to think about
it up front. Furthermore, I see partitioning as a function on the scale of
the data while replication is a function of cluster capacity. Thoughts?

On Tue, Apr 30, 2019 at 8:58 AM Ismael Juma  wrote:

> Thanks for the KIP, Almog. This is a good change. I think we should also
> allow the partition count broker default to be used (the one used for auto
> topic creation).
>
> Ismael
>
> On Tue, Apr 30, 2019, 8:39 AM Almog Gavra  wrote:
>
> > Hello Everyone,
> >
> > I'd like to start a discussion on KIP-464:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> >
> > It's about allowing users of the AdminClient to supply only a partition
> > count and to use the default replication factor configured by the kafka
> > cluster. Happy to receive any and all feedback!
> >
> > Cheers,
> > Almog
> >
>


[jira] [Resolved] (KAFKA-8134) ProducerConfig.LINGER_MS_CONFIG undocumented breaking change in kafka-clients 2.1

2019-04-30 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe resolved KAFKA-8134.

Resolution: Fixed

> ProducerConfig.LINGER_MS_CONFIG undocumented breaking change in kafka-clients 
> 2.1
> -
>
> Key: KAFKA-8134
> URL: https://issues.apache.org/jira/browse/KAFKA-8134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Sam Lendle
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Prior to 2.1, the type of the "linger.ms" config was Long, but was changed to 
> Integer in 2.1.0 ([https://github.com/apache/kafka/pull/5270]) A config using 
> a Long value for that parameter which works with kafka-clients < 2.1 will 
> cause a ConfigException to be thrown when constructing a KafkaProducer if 
> kafka-clients is upgraded to >= 2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread John Roesler
Hi Ivan,

Thanks for the update.

FWIW, I agree with Matthias that the current "start branching" operator is
confusing when named the same way as the actual branches. "Split" seems
like a good name. Alternatively, we can do without a "start branching"
operator at all, and just do:

stream
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

Tentatively, I think that this branching operation should be terminal. That
way, we don't create ambiguity about how to use it. That is, `branch`
should return `KBranchedStream`, while `defaultBranch` is `void`, to
enforce that it comes last, and that there is only one definition of the
default branch. Potentially, we should log a warning if there's no default,
and additionally log a warning (or throw an exception) if a record falls
though with no default.

Thoughts?

Thanks,
-John

On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
wrote:

> Thanks for updating the KIP and your answers.
>
>
> >  this is to make the name similar to String#split
> >> that also returns an array, right?
>
> The intend was to avoid name duplication. The return type should _not_
> be an array.
>
> The current proposal is
>
> stream.branch()
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> IMHO, this reads a little odd, because the first `branch()` does not
> take any parameters and has different semantics than the later
> `branch()` calls. Note, that from the code snippet above, it's hidden
> that the first call is `KStream#branch()` while the others are
> `KBranchedStream#branch()` what makes reading the code harder.
>
> Because I suggested to rename `addBranch()` -> `branch()`, I though it
> might be better to also rename `KStream#branch()` to avoid the naming
> overlap that seems to be confusing. The following reads much cleaner to me:
>
> stream.split()
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> Maybe there is a better alternative to `split()` though to avoid the
> naming overlap.
>
>
> > 'default' is, however, a reserved word, so unfortunately we cannot have
> a method with such name :-)
>
> Bummer. Didn't consider this. Maybe we can still come up with a short name?
>
>
> Can you add the interface `KBranchedStream` to the KIP with all it's
> methods? It will be part of public API and should be contained in the
> KIP. For example, it's unclear atm, what the return type of
> `defaultBranch()` is.
>
>
> You did not comment on the idea to add a `KBranchedStream#get(int index)
> -> KStream` method to get the individually branched-KStreams. Would be
> nice to get your feedback about it. It seems you suggest that users
> would need to write custom utility code otherwise, to access them. We
> should discuss the pros and cons of both approaches. It feels
> "incomplete" to me atm, if the API has no built-in support to get the
> branched-KStreams directly.
>
>
>
> -Matthias
>
>
> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > Hi all!
> >
> > I have updated the KIP-418 according to the new vision.
> >
> > Matthias, thanks for your comment!
> >
> >> Renaming KStream#branch() -> #split()
> >
> > I can see your point: this is to make the name similar to String#split
> > that also returns an array, right? But is it worth the loss of backwards
> > compatibility? We can have overloaded branch() as well without affecting
> > the existing code. Maybe the old array-based `branch` method should be
> > deprecated, but this is a subject for discussion.
> >
> >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >
> > Totally agree with 'addBranch->branch' rename. 'default' is, however, a
> > reserved word, so unfortunately we cannot have a method with such name
> :-)
> >
> >> defaultBranch() does take an `Predicate` as argument, but I think that
> > is not required?
> >
> > Absolutely! I think that was just copy-paste error or something.
> >
> > Dear colleagues,
> >
> > please revise the new version of the KIP and Paul's PR
> > (https://github.com/apache/kafka/pull/6512)
> >
> > Any new suggestions/objections?
> >
> > Regards,
> >
> > Ivan
> >
> >
> > 11.04.2019 11:47, Matthias J. Sax пишет:
> >> Thanks for driving the discussion of this KIP. It seems that everybody
> >> agrees that the current branch() method using arrays is not optimal.
> >>
> >> I had a quick look into the PR and I like the overall proposal. There
> >> are some minor things we need to consider. I would recommend the
> >> following renaming:
> >>
> >> KStream#branch() -> #split()
> >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >>
> >> It's just a suggestion to get slightly shorter method names.
> >>
> >> In the current PR, defaultBranch() does take an `Predicate` as argument,
> >> but I think that is not required?
> >>
> >> Also, we should consider KIP-307, that was recently accepted 

Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-04-30 Thread Ismael Juma
Thanks for the KIP, Almog. This is a good change. I think we should also
allow the partition count broker default to be used (the one used for auto
topic creation).

Ismael

On Tue, Apr 30, 2019, 8:39 AM Almog Gavra  wrote:

> Hello Everyone,
>
> I'd like to start a discussion on KIP-464:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
>
> It's about allowing users of the AdminClient to supply only a partition
> count and to use the default replication factor configured by the kafka
> cluster. Happy to receive any and all feedback!
>
> Cheers,
> Almog
>


[DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-04-30 Thread Almog Gavra
Hello Everyone,

I'd like to start a discussion on KIP-464:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic

It's about allowing users of the AdminClient to supply only a partition
count and to use the default replication factor configured by the kafka
cluster. Happy to receive any and all feedback!

Cheers,
Almog


Re: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Boyang Chen
Hey Bruno,

"throttling purpose" means that we use `client.id` to track the request quota 
and do the throttling based on that. So in that context, it is expected to use 
same `client.id` for a certain set of consumers.

Also thank you Guozhang for the comment! Merged with the KIP.

Boyang

From: Bruno Cadonna 
Sent: Tuesday, April 30, 2019 4:15 PM
To: dev@kafka.apache.org
Subject: Re: Fw: [DISCUSS] KIP-462 : Use local thread id for KStreams

Hi Guozhang,

What do you mean exactly with "throttling purposes"?

@Boyang: Thank you for the KIP!

Best,
Bruno

On Tue, Apr 30, 2019 at 1:15 AM Guozhang Wang  wrote:

> Hi Boyang,
>
> Thanks for the KIP. I think it makes sense.
>
> Just following up on the documentation part: since we are effectively
> removing this guard against same client.ids of instances --- and btw,
> semantically we would not forbid users to set the same client.ids anyways
> for throttling purposes for example --- it's worth augmenting the
> client.id
> config description by stating what users should expect client.id to be
> propagated to internal embedded clients, and therefore what's the expected
> outcome if they choose to set same client.ids for different Streams client.
>
>
> Otherwise, I've no further comments.
>
> Guozhang
>
> On Mon, Apr 29, 2019 at 3:42 PM Boyang Chen  wrote:
>
> > FYI
> >
> >
> > 
> > From: Boyang Chen 
> > Sent: Tuesday, April 30, 2019 4:32 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Could we get more discussions on this thread?
> >
> > Boyang
> >
> > 
> > From: Boyang Chen 
> > Sent: Friday, April 26, 2019 10:51 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Thanks for the explanation Matthias! Will enhance the KIP motivation by
> > your example.
> >
> >
> > 
> > From: Matthias J. Sax 
> > Sent: Friday, April 26, 2019 3:42 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Thanks for the KIP!
> >
> > I agree that the change makes sense, and not only for the static group
> > membership case.
> >
> > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > new one (for example to recover failed threads), while the JVM is still
> > running, it is more intuitive that the thread names are number from 1 to
> > X again, and not from X+1 to 2*x on restart.
> >
> > Also, the original idea about making thread names unique across
> > application is non-intuitive itself. It might make sense if there are
> > two instances of the same application within one JVM -- however, this
> > seems to be a rather rare case. Also, the only pattern for this use case
> > seems to by dynamic scaling, and I believe we should actually void this
> > pattern by adding a `stopThread()` and `addThread()` method to
> > `KafkaStreams` directly.
> >
> >
> > -Matthias
> >
> >
> > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > Hey friends,
> > >
> > > I would like to start discussion for a very small KIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > >
> > > it is trying to avoid sharing thread-id increment between multiple
> > stream instances configured in one JVM. This is an important fix for
> static
> > membership<
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> > to be effective for KStreams in edge case like changing `
> group.instance.id`
> > throughout restarts due to thread-id interleaving.
> > >
> > > I will open the vote thread in the main while, since this is a very
> > small fix. Feel free to continue the discussion on this thread, thank
> you!
> > >
> > > Boyang
> > >
> >
> >
>
> --
> -- Guozhang
>


Re: Guava version upgrade

2019-04-30 Thread jiahaozhouwh



On 2019/03/15 16:44:32, "Colin McCabe"  wrote: 
> Hi JIAHAO,
> 
> Kafka does not use Guava.
> 
> Some of the packages Kafka Connect depend on use Guava.  Perhaps the right 
> thing to do is track down those projects and see how they are using Guava (if 
> they are vulnerable to the CVE).
> 
> best,
> Colin
> 
> 
> On Mon, Mar 4, 2019, at 15:52, JIAHAO ZHOU wrote:
> > Hello,
> > when downloading Kafka 2.1.1, the  kafka_2.12-2.1.1.tgz still contains
> > guava-20.0.jar. This guava version currently has a vulnerability
> > described here: https://github.com/google/guava/wiki/CVE-2018-10237
> > The version 24.1.1 and 25.0+ are fixed version.
> > Are there any plans to upgrade this dependency?
> > 
> > Regards
> > Jiahao Zhou
> >
> Thanks Colin


Build failed in Jenkins: kafka-2.1-jdk8 #175

2019-04-30 Thread Apache Jenkins Server
See 


Changes:

[matthias] MINOR: improve Session expiration notice (#6618)

--
[...truncated 919.52 KB...]
kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
logsShouldNotDivergeOnUncleanLeaderElections STARTED

kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest > 
logsShouldNotDivergeOnUncleanLeaderElections PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.ListOffsetsRequestTest > testListOffsetsErrorCodes STARTED

kafka.server.ListOffsetsRequestTest > testListOffsetsErrorCodes PASSED

kafka.server.ListOffsetsRequestTest > testCurrentEpochValidation STARTED

kafka.server.ListOffsetsRequestTest > testCurrentEpochValidation PASSED

kafka.server.DelayedOperationTest > testRequestPurge STARTED

kafka.server.DelayedOperationTest > testRequestPurge PASSED

kafka.server.DelayedOperationTest > testRequestExpiry STARTED

kafka.server.DelayedOperationTest > testRequestExpiry PASSED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist STARTED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride PASSED

kafka.server.DelayedOperationTest > testTryCompleteLockContention STARTED

kafka.server.DelayedOperationTest > testTryCompleteLockContention PASSED

kafka.server.DelayedOperationTest > testTryCompleteWithMultipleThreads STARTED

kafka.server.DelayedOperationTest > testTryCompleteWithMultipleThreads PASSED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations STARTED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations PASSED

kafka.server.DelayedOperationTest > testRequestSatisfaction STARTED

kafka.server.DelayedOperationTest > testRequestSatisfaction PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLock STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLock PASSED

kafka.server.OffsetsForLeaderEpochRequestTest > 
testOffsetsForLeaderEpochErrorCodes STARTED

kafka.server.OffsetsForLeaderEpochRequestTest > 
testOffsetsForLeaderEpochErrorCodes PASSED

kafka.server.OffsetsForLeaderEpochRequestTest > testCurrentEpochValidation 
STARTED

kafka.server.OffsetsForLeaderEpochRequestTest > testCurrentEpochValidation 
PASSED

kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota STARTED

kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > 

Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-30 Thread Matthias J. Sax
I just re-read the discussion on the original Jira.

It's still a little unclear to me, how this should work end-to-end? It
would be good, to describe some test patterns that we want to support
first. Maybe using some examples, that show how a test would be written?

I don't think that we should build a whole mocking framework similar to
EasyMock (or others); why re-invent the wheel? I think the goal should
be, to allow people to use their mocking framework of choice, and to
easily integrate it with `TopologyTestDriver`, without the need to
rewrite the code under test.


For the currently internal `KeyValueStoreTestDriver`, it's seems to be a
little different, as the purpose of this driver is to test a store
implementation. Hence, most users won't need this, because they use the
built-in stores anyway, ie, this driver would be for advanced users that
build their own stores.

I think it's actually two orthogonal things and it might even be good to
split both into two KIPs.



-Matthias


On 4/30/19 7:52 AM, Yishun Guan wrote:
> Sounds good! Let me work on this more and add some more information to this
> KIP before we continue.
> 
> On Tue, Apr 30, 2019, 00:45 Bruno Cadonna  wrote:
> 
>> Hi Yishun,
>>
>> Thank you for continuing with this KIP. IMO, this KIP is very important to
>> develop robust code.
>>
>> I think, a good approach is to do some research on mock development on the
>> internet and in the literatures and then try to prototype the mocks. These
>> activities should yield you a list of pros and cons that you can add to the
>> KIP. With this information it is simpler for everybody to discuss this KIP.
>>
>> Does this make sense to you?
>>
>> Best,
>> Bruno
>>
>> On Mon, Apr 29, 2019 at 7:11 PM Yishun Guan  wrote:
>>
>>> Hi,
>>>
>>> Sorry for the late reply, I have read through all your valuable
>>> comments. The KIP still needs work at this point.
>>>
>>> I think at this point, one question comes up is that, how should we
>>> implement the mock stores - as Sophie suggested, should we open to all
>>> Store backend and just wrap around the Store class type which the user
>>> will be providing - or, as Bruno suggested, we shouldn't have a
>>> production backend store to be wrapped around in a mock store, just
>>> keep track of the state of each method calls, even EasyMock could be
>>> one of the option too.
>>>
>>> Personally, EasyMock will makes the implementation easier but building
>>> from scratch provides extra functionality and provides expandability
>>> (But I am not sure what kind of extra functionality we want in the
>>> future).
>>>
>>> What do you guys think?
>>>
>>> Best,
>>> Yishun
>>>
>>> On Fri, Apr 26, 2019 at 2:03 AM Matthias J. Sax 
>>> wrote:

 What is the status of this KIP?


 Btw: there is also KIP-456. I was wondering if it might be required or
 helpful to align the design of both with each other. Thoughts?



 -Matthias


 On 4/11/19 12:17 AM, Matthias J. Sax wrote:
> Thanks for the KIP. Only one initial comment (Sophie mentioned this
> already but I want to emphasize on it).
>
> You state that
>
>> These will be internal classes, so no public API/interface.
>
> If this is the case, we don't need a KIP. However, the idea of the
> original Jira is to actually make those classes public, as part of
>> the
> `streams-test-utils` package. If it's not public, developers should
>> not
> use them, because they don't have any backward compatibility
>>> guarantees.
>
> Hence, I would suggest that the corresponding classes go into a new
> package `org.apache.kafka.streams.state`.
>
>
> -Matthias
>
>
> On 4/9/19 8:58 PM, Bruno Cadonna wrote:
>> Hi Yishun,
>>
>> Thank you for the KIP.
>>
>> I have a couple of comments:
>>
>> 1. Could you please add an example to the KIP that demonstrates how
>>> the
>> mocks should be used in a test?
>>
>> 2. I am wondering, whether the MockKeyValueStore needs to be backed
>>> by an
>> actual KeyValueStore (in your KIP InMemoryKeyValueStore). Would it
>> not
>> suffice to provide the mock with the entries that it has to check in
>>> case
>> of input operation like put() and with the entries it has to return
>>> in case
>> of an output operation like get()? In my opinion, a mock should have
>>> as
>> little and as simple code as possible. A unit test should depend as
>>> little
>> as possible from productive code that it does not explicitly test.
>>
>> 3. I would be interested in the arguments against using a
>>> well-established
>> and well-tested mock framework like EasyMock. If there are good
>>> arguments,
>> they should be listed under 'Rejected Alternatives'.
>>
>> 3. What is the purpose of the parameter 'time' in MockStoreFactory?
>>
>> Best,
>> Bruno
>>
>> On Tue, Apr 9, 2019 at 11:29 AM Sophie Blee-Goldman <

Re: [DISCUSS] KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

2019-04-30 Thread Matthias J. Sax
KIP-451 was discarded in favor this this KIP. So it seems we are all on
the same page.


>> Relating to KIP-448.
>> What kind of alignment did you think about?

Nothing in particular. Was more or less a random though. Maybe there is
nothing to be aligned. Just wanted to bring it up. :)


>> Some thoughts after reading also the comments in KAFKA-6460:
>> To my understand these KIP-448 mock classes need to be fed somehow into
>> TopologyTestDriver.
>> I don't know how those KIP-448 mock are planned to be set to
>> TopologyTestDriver.

KIP-448 is still quite early, and it's a little unclear... Maybe we
should just ignore it for now. Sorry for the distraction with my comment
about it.


Please give me some more time to review this KIP in detail and I will
follow up later again.


-Matthias

On 4/26/19 2:25 PM, Jukka Karvanen wrote:
> Yes, my understanding was also that this KIP cover all the requirement of
> KIP-451.
> 
> Relating to KIP-448.
> What kind of alignment did you think about?
> 
> Some thoughts after reading also the comments in KAFKA-6460:
> To my understand these KIP-448 mock classes need to be fed somehow into
> TopologyTestDriver.
> I don't know how those KIP-448 mock are planned to be set to
> TopologyTestDriver.
> 
> On contrast KIP-456 was planned to be on top of unmodified
> TopologyTestDriver and now driver is set to TestInputTopic and
> TestOutputTopic in constructor.
> There are also alternative that these Topic object could be get from
> TopologyTestDriver, but it would require the duplicating the constructors
> of Topics as methods to
> TopologyTestDriver.
> 
> Also related to those Store object when going through the methods in
> TopologyTestDriver I noticed accessing the state stores could be be the
> third candidate for helper class or a group of classes.
> So addition to have TestInputTopic and TestOutputTopic, it could be also
> TestKeyValueStore, TestWindowStore, ... to follow the logic in this
> KPI-456, but
> it does add not any functionality on top of .already existing functionality
> *Store classes. So that's why I did not include those.
> 
> Jukka
> -
> 
> 
> 
> 
> 
> Not
> 
> pe 26. huhtik. 2019 klo 12.03 Matthias J. Sax (matth...@confluent.io)
> kirjoitti:
> 
>> Btw: there is also KIP-448. I was wondering if it might be required or
>> helpful to align the design of both with each other. Thoughts?
>>
>>
>>
>> On 4/25/19 11:22 PM, Matthias J. Sax wrote:
>>> Thanks for the KIP!
>>>
>>> I was just comparing this KIP with KIP-451 (even if I did not yet make a
>>> sorrow read over this KIP), and I agree that there is a big overlap. It
>>> seems that KIP-456 might subsume KIP-451.
>>>
>>> Let's wait on Patrick's input to see how to proceed.
>>>
>>>
>>> -Matthias
>>>
>>> On 4/25/19 12:03 AM, Jukka Karvanen wrote:
 Hi,

 If you want to see or test the my current idea of the implementation of
 this KIP, you can check it out in my repo:

>> https://github.com/jukkakarvanen/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics


 After my test with KPI-451  I do not see need for add methods for
 Iterables, but waiting Patrick's clarification of the use case.

 Jukka


 ti 23. huhtik. 2019 klo 15.37 Jukka Karvanen (
>> jukka.karva...@jukinimi.com)
 kirjoitti:

> Hi All,
>
> I would like to start the discussion on KIP-456: Helper classes to
>> make it
> simpler to write test logic with TopologyTestDriver:
>
>
>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-456%3A+Helper+classes+to+make+it+simpler+to+write+test+logic+with+TopologyTestDriver
>
>
> There is also related KIP adding methods to TopologyTestDriver:
>
>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-451%3A+Make+TopologyTestDriver+output+iterable
>
>
> I added those new Iterable based methods to this TestOutputTopic even
>> not
> tested those myself yet.
> So this version contains both my original List and Map based methods
>> and
> these new one.
> Based on the discussion some of these can be dropped, if those are
>> seen as
> redundant.
>
> Best Regards,
> Jukka
>
>

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

2019-04-30 Thread Matthias J. Sax
Mike,

I am still not sure, why we need to add this assignor to the project.
Even after you pointed out that you cannot use Kafka Streams, the idea
of the consumer to make the `PartitionAssignor` interface public and
plugable is, that the project does not need to add strategies for all
kind of use cases, but that people can customize the assignors to their
needs.

My main question is: how generic is this use case (especially with Kafka
Streams offering joins out-of-the-box) and do we really need to add it?
So far, it seems ok to me, if you just write a custom assignor and plug
it into the consumer. I don't see a strong need to add it to the Kafka
code base. Basically, it breaks down to

- How many people use joins?
- How many people can or can't use Kafka Streams joins?
- To what extend can Kafka Streams be improved to increase the use-case
coverage?
- How many people would benefit? (ie, even with adding this assignor,
they might still be users who need to customize their own assignors
because their join-use-case is still different to yours.)


Also note, that in Kafka Streams you could still provide a custom state
store implementation (with or without using a compacted changelog) and a
`Processor` or `Transformer` to implement a custom join. Even if this
might not work for your specific case, it might work for many other
people who want to customer a join instead of using Kafka Streams'
out-of-the-box join.


Can you elaborate why you think it needs to be part of Kafka directly?


One last question:

> - Our state has a high eviction rate, so kafka compacted topics are not ideal 
> for storing the changelog. The compaction cannot keep up and the topic will 
> be majority tombstones when it is read on partition reassignment. We are 
> using a KV store the "change log" instead.

What do you mean by 'We are using a KV store the "change log" instead.'?
How to you handle reassignment and state movement? Curious to see if we
could improve Kafka Streams :)


-Matthias


On 4/30/19 3:09 AM, Mike Freyberger wrote:
> In light of KIP-429, I think there will be an increased demand for sticky 
> assignors. So, I'd like to restart the conversation about adding the sticky 
> streams assignor, 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor.
>  
> 
> It’d be great to get feedback on the overall idea and the proposed 
> implementation.
> 
> Thanks,
> 
> Mike
> 
> 
> On 6/20/18, 5:47 PM, "Mike Freyberger"  wrote:
> 
> Matthias, 
> 
> Thanks for the feedback. For our use case, we have some complexities that 
> make using the existing Streams API more complicated than using the Kafka 
> Consumer directly. 
> 
> - We are doing async processing, which I don't think is currently 
> available (KIP-311 is handling this). 
> 
> - Our state has a high eviction rate, so kafka compacted topics are not 
> ideal for storing the changelog. The compaction cannot keep up and the topic 
> will be majority tombstones when it is read on partition reassignment. We are 
> using a KV store the "change log" instead.
> 
> - We wanted to separate consumer threads from worker threads to maximize 
> parallelization while keeping consumer TCP connections down.
> 
> Ultimately, it was much simpler to use the KafkaConsumer directly rather 
> than peel away a lot of what Streams API does for you. I think we should 
> continue to add support for more complex use cases and processing to the 
> Streams API. However, I think there will remain streaming join use cases that 
> can benefit from the flexibility that comes from using the KafkaConsumer 
> directly. 
> 
> Mike
> 
> On 6/20/18, 5:08 PM, "Matthias J. Sax"  wrote:
> 
> Mike,
> 
> thanks a lot for the KIP. I am wondering, why Streams API cannot be 
> used
> for perform the join? Would be good to understand the advantage of
> adding a `StickyStreamJoinAssignor` compared to using Streams API? 
> Atm,
> it seems to be a redundant feature to me.
> 
> -Matthias
> 
> 
> On 6/20/18 1:07 PM, Mike Freyberger wrote:
> > Hi everybody,
> > 
> > I’ve created a proposal document for KIP-315 which outlines the 
> motivation of adding a new partition assignment strategy that can used for 
> streaming join use cases.
> > 
> > It’d be great to get feedback on the overall idea and the proposed 
> implementation.
> > 
> > KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
> > 
> > Thanks,
> > 
> > Mike
> > 
> 
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-30 Thread Bruno Cadonna
Hi,

@Paul: Thank you for the KIP!

I hope you do not mind that I jump in.

I have the following comments:

1) `null` vs empty list in the default implementation
IIUC, returning `null` in the default implementation should basically
signal that the method `stateStores` was not overridden. Why then provide a
default implementation in the first place? Without default implementation
you would discover the missing implementation already at compile-time and
not only at runtime. If you decide not to provide a default implementation,
`XSupplier extends StateStoreConnector` would break existing code as
Matthias has already pointed out.

2) `process` method adding the StoreBuilders to the topology
If the default implementation returned `null` and `XSupplier extends
StateStoreConnector`, then existing code would break, because
`StreamsBuilder#addStateStore()` would throw a NPE.

+1 for opening a WIP PR

Best,
Bruno


On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
wrote:

> Thank Paul!
>
> I agree with all of that. If we think that the general design is good,
> refactoring a PR if we want to pick a different name should not be too
> much additional work (hopefully). Thus, if you want to open a WIP PR and
> we use it to nail the open details, it might help to find a good
> conclusion.
>
> >> 2) Default method vs new interface:
>
> This seems to be the hardest tradeoff. I see the point about
> discoveability... Might be good to get input from others, which version
> they would prefer.
>
> Just to make clear, my suggestion from the last email was, that
> `Transformer` etc does not extend the new interface. Instead, a user
> that want to use this feature would need to implement both interfaces.
>
> If `Transformer extends StoreProvider` (just picking a name here)
> without default implementation existing code would break and thus it not
> a an option because of breaking backward compatibility.
>
>
> -Matthias
>
> On 4/28/19 8:37 PM, Paul Whalen wrote:
> > Great thoughts Matthias, thanks! I think we're all agreed that naming and
> > documentation/education are the biggest hurdles for this KIP, and in
> light
> > of that, I think it makes sense for me to just take a stab at a full
> > fledged PR with documentation to convince us that it's possible to do it
> > with enough clarity.
> >
> > In response to your specific thoughts:
> >
> > 1) StateStoreConnector as a name: Really good point about defining the
> > difference between "adding" and "connecting."  Guozhang suggested
> > StateStoreConnector which was definitely an improvement over my
> > StateStoresSupplier, but I think you're right that we need to be careful
> to
> > make it clear that it's really accomplishing both.  Thinking about it
> now,
> > one problem with Connector is that the implementer of the interface is
> not
> > really doing any connecting, it's providing/supplying the store that will
> > be both added and connected.  StoreProvider seems reasonable to me and
> > probably the best candidate at the moment, but it would be nice if the
> name
> > could convey that it's providing the store specifically so the caller can
> > add it to the topology and connect it to the associated transformer.
> >
> > In general I think that really calling out what "adding" versus
> > "connecting" is in the documentation will help make the entire purpose of
> > this feature more clear to the user.
> >
> > 2) Default method vs new interface: The choice of a default method was
> > influenced by Guozhang's fear about API bloat/discoverability.  I can
> > definitely see it both ways   Would the separate interface be a
> > sub-interface of Processor/TransformerSupplier or standalone?  It seems
> > like you're suggesting standalone and I think that's what I favor.  My
> only
> > concern there is that the interface wouldn't actually be a type to any
> > public API which sort of hurts discoverability.  You would have to read
> the
> > javadocs for stream.process/transform() to discover that implementing the
> > interface in addition to Processor/TransformerSupplier would add and
> > connect the store for you.  But that added burden actually probably helps
> > us in terms of making sure people don't mix and match, like you said.
> >
> > 3) Returning null instead of empty: Seems fair to me.  I always worry
> about
> > returning null when an empty collection can be used instead, but given
> that
> > the library is the caller rather than the client I think your point makes
> > sense here.
> >
> > 4) Returning Set instead of Collection: Agreed, don't see why not to make
> > it more specific.
> >
> > Paul
> >
> > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax 
> > wrote:
> >
> >> Hi, sorry for the long pause. Just trying to catch up here.
> >>
> >> I think it save to allow `addStateStore()` to be idempotent for the same
> >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
> >> not really possible to use the same `StoreBuilder` object to create
> >> different 

Re: Fw: [DISCUSS] KIP-462 : Use local thread id for KStreams

2019-04-30 Thread Bruno Cadonna
Hi Guozhang,

What do you mean exactly with "throttling purposes"?

@Boyang: Thank you for the KIP!

Best,
Bruno

On Tue, Apr 30, 2019 at 1:15 AM Guozhang Wang  wrote:

> Hi Boyang,
>
> Thanks for the KIP. I think it makes sense.
>
> Just following up on the documentation part: since we are effectively
> removing this guard against same client.ids of instances --- and btw,
> semantically we would not forbid users to set the same client.ids anyways
> for throttling purposes for example --- it's worth augmenting the
> client.id
> config description by stating what users should expect client.id to be
> propagated to internal embedded clients, and therefore what's the expected
> outcome if they choose to set same client.ids for different Streams client.
>
>
> Otherwise, I've no further comments.
>
> Guozhang
>
> On Mon, Apr 29, 2019 at 3:42 PM Boyang Chen  wrote:
>
> > FYI
> >
> >
> > 
> > From: Boyang Chen 
> > Sent: Tuesday, April 30, 2019 4:32 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Could we get more discussions on this thread?
> >
> > Boyang
> >
> > 
> > From: Boyang Chen 
> > Sent: Friday, April 26, 2019 10:51 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Thanks for the explanation Matthias! Will enhance the KIP motivation by
> > your example.
> >
> >
> > 
> > From: Matthias J. Sax 
> > Sent: Friday, April 26, 2019 3:42 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-462 : Use local thread id for KStreams
> >
> > Thanks for the KIP!
> >
> > I agree that the change makes sense, and not only for the static group
> > membership case.
> >
> > For example, if a user `closes()` a `KafkaStreams` client and creates a
> > new one (for example to recover failed threads), while the JVM is still
> > running, it is more intuitive that the thread names are number from 1 to
> > X again, and not from X+1 to 2*x on restart.
> >
> > Also, the original idea about making thread names unique across
> > application is non-intuitive itself. It might make sense if there are
> > two instances of the same application within one JVM -- however, this
> > seems to be a rather rare case. Also, the only pattern for this use case
> > seems to by dynamic scaling, and I believe we should actually void this
> > pattern by adding a `stopThread()` and `addThread()` method to
> > `KafkaStreams` directly.
> >
> >
> > -Matthias
> >
> >
> > On 4/25/19 11:13 PM, Boyang Chen wrote:
> > > Hey friends,
> > >
> > > I would like to start discussion for a very small KIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-462%3A+Use+local+thread+id+for+KStreams
> > >
> > > it is trying to avoid sharing thread-id increment between multiple
> > stream instances configured in one JVM. This is an important fix for
> static
> > membership<
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> > to be effective for KStreams in edge case like changing `
> group.instance.id`
> > throughout restarts due to thread-id interleaving.
> > >
> > > I will open the vote thread in the main while, since this is a very
> > small fix. Feel free to continue the discussion on this thread, thank
> you!
> > >
> > > Boyang
> > >
> >
> >
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-30 Thread Yishun Guan
Sounds good! Let me work on this more and add some more information to this
KIP before we continue.

On Tue, Apr 30, 2019, 00:45 Bruno Cadonna  wrote:

> Hi Yishun,
>
> Thank you for continuing with this KIP. IMO, this KIP is very important to
> develop robust code.
>
> I think, a good approach is to do some research on mock development on the
> internet and in the literatures and then try to prototype the mocks. These
> activities should yield you a list of pros and cons that you can add to the
> KIP. With this information it is simpler for everybody to discuss this KIP.
>
> Does this make sense to you?
>
> Best,
> Bruno
>
> On Mon, Apr 29, 2019 at 7:11 PM Yishun Guan  wrote:
>
> > Hi,
> >
> > Sorry for the late reply, I have read through all your valuable
> > comments. The KIP still needs work at this point.
> >
> > I think at this point, one question comes up is that, how should we
> > implement the mock stores - as Sophie suggested, should we open to all
> > Store backend and just wrap around the Store class type which the user
> > will be providing - or, as Bruno suggested, we shouldn't have a
> > production backend store to be wrapped around in a mock store, just
> > keep track of the state of each method calls, even EasyMock could be
> > one of the option too.
> >
> > Personally, EasyMock will makes the implementation easier but building
> > from scratch provides extra functionality and provides expandability
> > (But I am not sure what kind of extra functionality we want in the
> > future).
> >
> > What do you guys think?
> >
> > Best,
> > Yishun
> >
> > On Fri, Apr 26, 2019 at 2:03 AM Matthias J. Sax 
> > wrote:
> > >
> > > What is the status of this KIP?
> > >
> > >
> > > Btw: there is also KIP-456. I was wondering if it might be required or
> > > helpful to align the design of both with each other. Thoughts?
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/11/19 12:17 AM, Matthias J. Sax wrote:
> > > > Thanks for the KIP. Only one initial comment (Sophie mentioned this
> > > > already but I want to emphasize on it).
> > > >
> > > > You state that
> > > >
> > > >> These will be internal classes, so no public API/interface.
> > > >
> > > > If this is the case, we don't need a KIP. However, the idea of the
> > > > original Jira is to actually make those classes public, as part of
> the
> > > > `streams-test-utils` package. If it's not public, developers should
> not
> > > > use them, because they don't have any backward compatibility
> > guarantees.
> > > >
> > > > Hence, I would suggest that the corresponding classes go into a new
> > > > package `org.apache.kafka.streams.state`.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 4/9/19 8:58 PM, Bruno Cadonna wrote:
> > > >> Hi Yishun,
> > > >>
> > > >> Thank you for the KIP.
> > > >>
> > > >> I have a couple of comments:
> > > >>
> > > >> 1. Could you please add an example to the KIP that demonstrates how
> > the
> > > >> mocks should be used in a test?
> > > >>
> > > >> 2. I am wondering, whether the MockKeyValueStore needs to be backed
> > by an
> > > >> actual KeyValueStore (in your KIP InMemoryKeyValueStore). Would it
> not
> > > >> suffice to provide the mock with the entries that it has to check in
> > case
> > > >> of input operation like put() and with the entries it has to return
> > in case
> > > >> of an output operation like get()? In my opinion, a mock should have
> > as
> > > >> little and as simple code as possible. A unit test should depend as
> > little
> > > >> as possible from productive code that it does not explicitly test.
> > > >>
> > > >> 3. I would be interested in the arguments against using a
> > well-established
> > > >> and well-tested mock framework like EasyMock. If there are good
> > arguments,
> > > >> they should be listed under 'Rejected Alternatives'.
> > > >>
> > > >> 3. What is the purpose of the parameter 'time' in MockStoreFactory?
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >> On Tue, Apr 9, 2019 at 11:29 AM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Hi Yishun, thanks for the KIP! I have a few initial
> > questions/comments:
> > > >>>
> > > >>> 1) It may be useful to capture the iterator results as well (eg
> with
> > a
> > > >>> MockIterator that wraps the underlying iterator and records the
> same
> > way
> > > >>> the MockStore wraps/records the underlying store)
> > > >>>
> > > >>> 2) a. Where is the "persistent" variable coming from or being used?
> > It
> > > >>> seems the MockKeyValueStore accepts it in the constructor, but only
> > the
> > > >>> name parameter is passed when constructing a new MockKeyValueStore
> in
> > > >>> build() ... also, if we extend InMemoryXXXStore shouldn't this
> > always be
> > > >>> false?
> > > >>> b. Is the idea to wrap an in-memory store for each type
> > (key-value,
> > > >>> session, etc)? We don't (yet) offer an in-memory version of the
> > session
> > > >>> store although it is in the works, 

Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-30 Thread Bruno Cadonna
Hi Yishun,

Thank you for continuing with this KIP. IMO, this KIP is very important to
develop robust code.

I think, a good approach is to do some research on mock development on the
internet and in the literatures and then try to prototype the mocks. These
activities should yield you a list of pros and cons that you can add to the
KIP. With this information it is simpler for everybody to discuss this KIP.

Does this make sense to you?

Best,
Bruno

On Mon, Apr 29, 2019 at 7:11 PM Yishun Guan  wrote:

> Hi,
>
> Sorry for the late reply, I have read through all your valuable
> comments. The KIP still needs work at this point.
>
> I think at this point, one question comes up is that, how should we
> implement the mock stores - as Sophie suggested, should we open to all
> Store backend and just wrap around the Store class type which the user
> will be providing - or, as Bruno suggested, we shouldn't have a
> production backend store to be wrapped around in a mock store, just
> keep track of the state of each method calls, even EasyMock could be
> one of the option too.
>
> Personally, EasyMock will makes the implementation easier but building
> from scratch provides extra functionality and provides expandability
> (But I am not sure what kind of extra functionality we want in the
> future).
>
> What do you guys think?
>
> Best,
> Yishun
>
> On Fri, Apr 26, 2019 at 2:03 AM Matthias J. Sax 
> wrote:
> >
> > What is the status of this KIP?
> >
> >
> > Btw: there is also KIP-456. I was wondering if it might be required or
> > helpful to align the design of both with each other. Thoughts?
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/11/19 12:17 AM, Matthias J. Sax wrote:
> > > Thanks for the KIP. Only one initial comment (Sophie mentioned this
> > > already but I want to emphasize on it).
> > >
> > > You state that
> > >
> > >> These will be internal classes, so no public API/interface.
> > >
> > > If this is the case, we don't need a KIP. However, the idea of the
> > > original Jira is to actually make those classes public, as part of the
> > > `streams-test-utils` package. If it's not public, developers should not
> > > use them, because they don't have any backward compatibility
> guarantees.
> > >
> > > Hence, I would suggest that the corresponding classes go into a new
> > > package `org.apache.kafka.streams.state`.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/9/19 8:58 PM, Bruno Cadonna wrote:
> > >> Hi Yishun,
> > >>
> > >> Thank you for the KIP.
> > >>
> > >> I have a couple of comments:
> > >>
> > >> 1. Could you please add an example to the KIP that demonstrates how
> the
> > >> mocks should be used in a test?
> > >>
> > >> 2. I am wondering, whether the MockKeyValueStore needs to be backed
> by an
> > >> actual KeyValueStore (in your KIP InMemoryKeyValueStore). Would it not
> > >> suffice to provide the mock with the entries that it has to check in
> case
> > >> of input operation like put() and with the entries it has to return
> in case
> > >> of an output operation like get()? In my opinion, a mock should have
> as
> > >> little and as simple code as possible. A unit test should depend as
> little
> > >> as possible from productive code that it does not explicitly test.
> > >>
> > >> 3. I would be interested in the arguments against using a
> well-established
> > >> and well-tested mock framework like EasyMock. If there are good
> arguments,
> > >> they should be listed under 'Rejected Alternatives'.
> > >>
> > >> 3. What is the purpose of the parameter 'time' in MockStoreFactory?
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Tue, Apr 9, 2019 at 11:29 AM Sophie Blee-Goldman <
> sop...@confluent.io>
> > >> wrote:
> > >>
> > >>> Hi Yishun, thanks for the KIP! I have a few initial
> questions/comments:
> > >>>
> > >>> 1) It may be useful to capture the iterator results as well (eg with
> a
> > >>> MockIterator that wraps the underlying iterator and records the same
> way
> > >>> the MockStore wraps/records the underlying store)
> > >>>
> > >>> 2) a. Where is the "persistent" variable coming from or being used?
> It
> > >>> seems the MockKeyValueStore accepts it in the constructor, but only
> the
> > >>> name parameter is passed when constructing a new MockKeyValueStore in
> > >>> build() ... also, if we extend InMemoryXXXStore shouldn't this
> always be
> > >>> false?
> > >>> b. Is the idea to wrap an in-memory store for each type
> (key-value,
> > >>> session, etc)? We don't (yet) offer an in-memory version of the
> session
> > >>> store although it is in the works, so this will be possible -- I am
> more
> > >>> wondering if it makes sense to decide this for the user or to allow
> them to
> > >>> choose between in-memory or rocksDB by setting "persistent"
> > >>>
> > >>> 3) I'm wondering if users might want to be able to plug in their own
> custom
> > >>> stores as the underlying backend...should we support this as well?
> WDYT?
> > >>>
> > >>> 4) We probably want to make these stores