[jira] [Created] (KAFKA-9950) MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException

2020-05-03 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-9950:


 Summary: MirrorMaker2 sharing of ConfigDef can lead to 
ConcurrentModificationException
 Key: KAFKA-9950
 URL: https://issues.apache.org/jira/browse/KAFKA-9950
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.1, 2.5.0, 2.4.0
Reporter: Chris Egerton
Assignee: Chris Egerton


The 
[MirrorConnectorConfig::CONNECTOR_CONFIG_DEF|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L397]
 object is reused across multiple MirrorMaker2 classes, which is fine the most 
part since it's a constant. However, the actual {{ConfigDef}} object itself is 
mutable, and is mutated when the {{MirrorTaskConfig}} class [statically 
constructs its own 
ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java#L62].

This has two unintended effects:
 # Since the two {{ConfigDef}} objects for the {{MirrorConnectorConfig}} and 
{{MirrorTaskConfig}} classes are actually the same object, the additional 
properties that the {{MirrorTaskConfig}} class defines for its {{ConfigDef}} 
are also added to the {{MirrorConnectorConfig}} class's {{ConfigDef}}. The 
impact of this isn't huge since both additional properties have default values, 
but this does cause those properties to appear in the 
{{/connectors/\{name}/config/validate}} endpoint once the {{MirrorTaskConfig}} 
class is loaded for the first time.
 # It's possible that, if a config for a MirrorMaker2 connector is submitted at 
approximately the same time that the {{MirrorTaskConfig}} class is loaded, a 
{{ConcurrentModificationException}} will be thrown by the {{AbstractHerder}} 
class when it tries to [iterate over all of the keys of the connector's 
ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L357].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-585: Conditional SMT

2020-05-03 Thread Christopher Egerton
Hi all,

This has certainly taken quite a turn! I wish we could get this done
without adding another pluggable interface but if the benefit is that now
any SMT--new or pre-existing--can be conditionally applied, it seems like
it might be worth it.

As far as the proposed design goes, some thoughts:

1. There isn't a clear reason why exceptions thrown from predicates should
be treated differently from exceptions thrown by transformations. We have
reasonable error-tolerance mechanisms in the Connect framework; why
silently swallow errors, especially if the risk is publishing potentially
corrupted data to Kafka or an external system because an SMT that should
have been applied wasn't?

2. It might be worth noting that the interface is a worker plugin, and that
presumably it would be loaded in the same way as other worker plugins such
as converters, connectors, and REST extensions. This would include aliasing
behavior that would allow users to specify predicates using their simple
class names as long as no two predicate plugins with the same simple name
were available on the worker.

3. Why would the Filter SMT explicitly take in a predicate in its
configuration if predicates can be applied to all SMTs? Just reading the
idea for a filter SMT, it seemed like the behavior would be that the SMT
would take in no configuration parameters and just blindly drop everything
that it sees, but typical use would involve pairing it with a predicate.

4. The question mark syntax seems a little cryptic, and combining the
configuration properties for the predicate and the transformation in the
same namespace ("transforms..*) seems a little noisy. What do you
think about allowing predicates to be configured in their own namespace,
and referenced by name in a single "predicate" (or maybe "?predicate" if we
really want to avoid risking backwards compatibility concerns) property?
This would also enable them to be more easily reused across several SMTs.

For example, you might configure a worker with these properties (assuming
plugin aliasing is supported for predicates in the same way that it is for
transformations):

transforms: t2
transforms.t2.predicate: has-my-prefix
transforms.t2.type:ExtractField$Key
transforms.t2.field: c1

predicates: has-my-prefix
predicates.has-my-prefix.type: TopicNameMatch
predicates.has-my-prefix.negate: true
predicates.has-my-prefix.pattern: my-prefix-.*

Excited to see how this is evolving and I think we're heading towards
something really useful for the Connect framework.

Cheers,

Chris


On Fri, May 1, 2020 at 9:51 AM Andrew Schofield 
wrote:

> Hi Tom,
> Looks good to me.
>
> Thanks,
> Andrew
>
> On 01/05/2020, 16:24, "Tom Bentley"  wrote:
>
> Hi,
>
> I've updated the KIP to reflect recent discussions. Please note the
> following:
>
> 1. I've described a HasHeaderKey predicate rather than HeaderKeyMatches
> because at the moment Headers doesn't expose the whole set of headers,
> only
> those with a specific name. Exposing this would significantly increase
> the
> scope of this KIP but relatively little extra benefit.
> 2. I used org.apache.kafka.connect.transformer.predicates rather than
> org.apache.kafka.connect.predicates, since I thought it better
> reflected
> the use of predicates within transforms. I'm flexible on this one if
> someone has a good case for the original name. For example the original
> package name would be more appropriate if we were expecting Connectors
> to
> make use of Predicates somehow.
> 3. I've dropped the special case of not needing to provide a FQN for
> predicate classes in that package, since this isn't something
> supported by
> transformations themselves as far as I know. I like the idea, but it
> seemed
> inconsistent to need it for transformations but not for predicates.
> 4. Chris, I've used your suggestions for a validate() method for the
> extra
> configurability needed. I've also added this to Predicate (even though
> I
> don't need it) for consistency with Connector and Transformation.
> 5. For negating the result of a predicate I decided it was clearer to
> just
> have a "negate" config parameter, supported by all predicates, than to
> reach for more punctuation.
>
> I'd be grateful for any more feedback people might have.
>
> Kind regards,
>
> Tom
>
> On Tue, Apr 28, 2020 at 3:50 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Tom,
> > I think we should go for a consistent naming convention for the
> > predicates, maybe so they
> > read nicely if you say "IF" first. Here's a suggestion for
> predicates that
> > the KIP could introduce:
> >  - TopicNameMatches
> >  - HeaderKeyMatches
> >  - RecordIsTombstone
> >
> > On naming, I also suggest using
> > org.apache.kafka.connect.predicates.Predicate
> > as the name for the interface.
> >
> > I hadn't settled on a pre

[jira] [Created] (KAFKA-9949) Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin

2020-05-03 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9949:
--

 Summary: Flaky Test 
GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin
 Key: KAFKA-9949
 URL: https://issues.apache.org/jira/browse/KAFKA-9949
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/248/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/]
{quote}java.lang.AssertionError: Condition not met within timeout 3. 
waiting for final values at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableIntegrationTest.java:175){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] KIP-581: Value of optional null field which has default value

2020-05-03 Thread Christopher Egerton
Hi Cheng,

I think refactoring that method should be fine (if maybe a little painful);
the method itself is private and all places that it's invoked directly are
either package-private or non-static, so it shouldn't affect any of the
public methods of the JSON converter to change "convertToConnect" to be
non-static. Even if it did, the only parts of the JSON converter that are
public API (and therefore officially subject to concerns about
compatibility) are the methods it implements that satisfy the "Converter"
and "HeaderConverter" interfaces.

Would you mind explicitly specifying in the KIP that the new property will
be added for the JSON converter only, and that it will affect both
serialization and deserialization?

Cheers,

Chris

On Tue, Apr 28, 2020 at 10:52 AM 379377944 <379377...@qq.com> wrote:

> Hi Chris,
>
>
> Thanks for your reminder, the original implement is deprecated, I just
> update the JIRA with the new
> PR link:  https://github.com/apache/kafka/pull/8575
>
>
> As question 2), I agree with you that we should consider both
> serialization and deserialization, and as you said, I only implement the
> serialization now. This is  because the original serde implement is not
> symmetrical, the convertToConnect is a static method and can’t access the
> field in JsonConverter
> instance, maybe I should do some refactoring to implement the
> deserialization.
>
>
> Thanks,
> Cheng Pan
>  Original Message
> Sender: Christopher Egerton
> Recipient: dev
> Date: Wednesday, Apr 15, 2020 02:28
> Subject: Re: [Discuss] KIP-581: Value of optional null field which has
> default value
>
>
> Hi Cheng, Thanks for the KIP! I really appreciate the care that was taken
> to ensure backwards compatibility for existing users, and the minimal
> changes to public interface that are suggested to address this. I have two
> quick requests for clarification: 1) Where is the proposed
> "accept.optional.null" property going to apply? It's hinted that it'll take
> effect on the JSON converter but not actually called out anywhere. 2)
> Assuming this takes effect on the JSON converter, is the intent to alter
> the semantics for both serialization and deserialization? The code snippet
> from the JSON converter that's included in the KIP comes from the
> "convertToJson" method, which is used for serialization. However, based on
> https://github.com/apache/kafka/blob/ea47a885b1fe47dfb87c1dc86db1b0e7eb8a273c/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L712-L713
> it looks like the converter also inserts the default value for
> optional-but-null data during deserialization. Thanks again for the KIP!
> Cheers, Chris On Wed, Mar 18, 2020 at 12:00 AM Cheng Pan <379377...@qq.com>
> wrote: > Hi all, > > I'd like to use this thread to discuss KIP-581: Value
> of optional null > field which has default value, please see detail at: >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value
> > > > There are some previous discussion at: >
> https://github.com/apache/kafka/pull/7112 > > > I'm a beginner for apache
> project, please let me know if I did any thing > wrong. > > > Best regards,
> > Cheng Pan


Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2020-05-03 Thread John Roesler
Hi Guozhang,

Ah, good question. Yes, the assignor will always now try to achieve a perfect 
balance. This was also the proposed default in the KIP before. The config would 
have allowed users to relax the search for perfection.

This is actually one of our motivations now to remove it. We feel it’s simpler 
to reason about the behavior of the system if you know it’s always going to 
produce a balanced assignment. 

Thanks,
John 

On Sun, May 3, 2020, at 19:03, Guozhang Wang wrote:
> Hello John / Sophie:
> 
> With this config removed, would the assignor always tries to to achieve the
> "perfect balance" (of course, it may be a sub-optimal local plateau) or
> there's an internal hard-coded factor to still retain some satisfying
> threshold?
> 
> Guozhang
> 
> On Sun, May 3, 2020 at 9:23 AM John Roesler  wrote:
> 
> > Hi Matthias,
> >
> > We originally proposed that config to allow us to skip migrating tasks if
> > the current balance is “good enough”. But during implementation, we became
> > concerned that supporting this option increased code complexity, and it’s
> > also an extra concept for users to have to learn.
> >
> > To keep the new balancing system simpler both internally and externally,
> > we’d like to drop it from the API for now, with the idea of adding it later
> > if needed.
> >
> > Does that seem reasonable?
> >
> > Thanks,
> > John
> >
> > On Fri, May 1, 2020, at 14:18, Matthias J. Sax wrote:
> > > Can you elaborate why to remove it?
> > >
> > > On 5/1/20 11:29 AM, Sophie Blee-Goldman wrote:
> > > > Hey all,
> > > >
> > > > We'd like to make a slight modification to the proposal in this KIP and
> > > > remove
> > > > the *balance.factor* config. We will update the KIP accordingly.
> > Please let
> > > > us know
> > > > if you have any concerns.
> > > >
> > > > Cheers,
> > > > Sophie
> > > >
> > > > On Wed, Jan 15, 2020 at 12:48 PM John Roesler 
> > wrote:
> > > >
> > > >> Hello all,
> > > >>
> > > >> After a long hiatus, I've just realized that I'm now able to upgrade
> > my
> > > >> non-binding support to a binding +1 for KIP-441.
> > > >>
> > > >> This brings the vote tally to:
> > > >> 3 binding +1s: Guozhang, Bill, and myself
> > > >> 3 non-binding +1s: Bruno, Vinoth, and Sophie
> > > >>
> > > >> Since the vote has been open for at least 72 hours, the KIP is
> > accepted.
> > > >>
> > > >> Thanks all,
> > > >> -John
> > > >>
> > > >>
> > > >>
> > > >> On Mon, Oct 28, 2019 at 21:02 PM John Roesler 
> > wrote:
> > > >>> Hey all,
> > > >>>
> > > >>> Now that the 2.4 release storm is over, I'd like to bump this vote
> > > >> thread.
> > > >>>
> > > >>> Currently, we have two binding +1s (Guozhang and Bill), and four
> > > >>> non-binding ones (Bruno, Vinoth, Sophie, and myself), and no vetoes.
> > > >>>
> > > >>> Thanks,
> > > >>> -John
> > > >>>
> > > >>> On Thu, Sep 12, 2019 at 12:54 PM Bill Bejeck 
> > wrote:
> > > 
> > >  +1 (binding)
> > > 
> > >  On Thu, Sep 12, 2019 at 1:53 PM Sophie Blee-Goldman <
> > > >> sop...@confluent.io> wrote:
> > > 
> > > > +1 (non-binding)
> > > >
> > > > On Wed, Sep 11, 2019 at 11:38 AM Vinoth Chandar <
> > > >> vchan...@confluent.io> wrote:
> > > >
> > > >> +1 (non-binding).
> > > >>
> > > >> On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna  > >
> > > >> wrote:
> > > >>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang <
> > > >> wangg...@gmail.com> wrote:
> > > 
> > >  +1 (binding).
> > > 
> > >  On Thu, Sep 5, 2019 at 2:47 PM John Roesler 
> > > >> wrote:
> > > 
> > > > Hello, all,
> > > >
> > > > After a great discussion, I'd like to open voting on KIP-441,
> > > > to avoid long restore times in Streams after rebalancing.
> > > > Please cast your votes!
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > 
> > > 
> > >  --
> > >  -- Guozhang
> > > >>>
> > > >>
> > > >
> > > >>>
> > > >>
> > > >
> > >
> > >
> > > Attachments:
> > > * signature.asc
> >
> 
> 
> -- 
> -- Guozhang
>


Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-05-03 Thread Guozhang Wang
Hello Jun,

Thanks for your comments! I'm replying inline below:

On Fri, May 1, 2020 at 12:36 PM Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the KIP. Great writeup. A few comments below.
>
> 100. Do we need AlterQuorum in the first version? Quorum changes are rare
> and the implementation is involved. ZK doesn't have that until 4 years
> after the initial version. Dropping that in the first version could speed
> up this KIP.
>

Yes, the first version of the implementation does not necessarily need
quorum re-configuration. In fact, in our current on-going work we did not
put it as the top priorities to complete before we push out the first
working prototype --- the quorum members can just be static “quorum.voters”
config.

However, we still want to include the design of quorum re-configuration in
this KIP for discussion, so that we are confident that when adding the
dynamic reconfiguration feature it is well-aligned with the protocol we've
proposed and implemented, and we would not need to rework a lot of the
implementation in our first version.


> 101. Bootstrapping related issues.
> 101.1 Currently, we support auto broker id generation. Is this supported
> for bootstrap brokers?
>

The vote ids would just be the broker ids. "bootstrap.servers" would be
similar to what client configs have today, where "quorum.voters" would be
pre-defined config values.


> 101.2 As Colin mentioned, sometimes we may need to load the security
> credentials to be broker before it can be connected to. Could you provide a
> bit more detail on how this will work?
>
>
This is a good question.. Either the credentials are stored in a remote
source or in local JAAS file, I think we need to load it before the broker
trying to find out the quorum.



> 102. Log compaction. One weak spot of log compaction is for the consumer to
> deal with deletes. When a key is deleted, it's retained as a tombstone
> first and then physically removed. If a client misses the tombstone
> (because it's physically removed), it may not be able to update its
> metadata properly. The way we solve this in Kafka is based on a
> configuration (log.cleaner.delete.retention.ms) and we expect a consumer
> having seen an old key to finish reading the deletion tombstone within that
> time. There is no strong guarantee for that since a broker could be down
> for a long time. It would be better if we can have a more reliable way of
> dealing with deletes.
>

We propose to capture this in the "FirstDirtyOffset" field of the quorum
record fetch response: the offset is the maximum offset that log compaction
has reached up to. If the follower has fetched beyond this offset it means
itself is safe hence it has seen all records up to that offset. On getting
the response, the follower can then decide if its end offset actually below
that dirty offset (and hence may miss some tombstones). If that's the case:

1) Naively, it could re-bootstrap metadata log from the very beginning to
catch up.
2) During that time, it would refrain itself from answering MetadataRequest
from any clients.


> 103. For the newly introduced configurations related to timeouts, could we
> describe the defaults?
>

We are still discussing about the default values, and I think some
benchmarking experiment would be needed. At the moment just based on the
literature, our general thinking are:

1) the fetch.timeout should be around the same scale with zk session
timeout, which is now 18 seconds by default -- in practice we've seen
unstable networks having more than 10 secs of transient connectivity,
2) the election.timeout, however, should be smaller than the fetch timeout
as is also suggested as a practical optimization in literature:
https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf


> 104. "This proposal requires a persistent log as well as a separate file to
> maintain the current quorum state". In JBOD, are the quorum log and quorum
> state file kept together on the same disk?
>
>
I think for correctness there's no requirement if they have to be either
both intact or corrupted: the quorum-state is the source of truth for the
current quorum state, especially for the current candidate (if any) that
this voter has voted to. If it is missing, the broker would just rely on
the gossiping FindQuorum to refresh its knowledge. The quorum log on the
other hand, is just storing the metadata updates (including the quorum
changes, for reconfiguration purposes) for Kafka. So it is okay if either
of them is corrupted while the other is intact.


> 105. Quorum State: In addition to VotedId, do we need the epoch
> corresponding to VotedId? Over time, the same broker Id could be voted in
> different generations with different epoch.
>
>
Hmm, this is a good point. Originally I think the "LeaderEpoch" field in
that file is corresponding to the "latest known leader epoch", not the
"current leader epoch". For example, if the current epoch is N, and then a
vote-request with epoch N+1 is received and the voter

Re: [DISCUSS] KIP-554: Add Broker-side SCRAM Config API

2020-05-03 Thread Guozhang Wang
Hello Colin,

Thanks for the KIP. The proposal itself looks good to me; but could you
elaborate a bit more on the rejected alternative of reusing
IncrementalAlterConfigs? What do you mean by complex string manipulation,
as well as error conditions?

Guozhang


On Fri, May 1, 2020 at 5:12 PM Colin McCabe  wrote:

> On Fri, May 1, 2020, at 08:35, Aneel Nazareth wrote:
> > Hi Colin,
> >
> > Thanks for the KIP. Is it also in scope to add support for the new API
> > to the Admin interface and the implementation in KafkaAdminClient?
> >
>
> Hi Aneel,
>
> Yes, we will have a Java API.  The new Admin API is described in the KIP.
>
> best,
> Colin
>
>
> > On Fri, May 1, 2020 at 1:18 AM Colin McCabe  wrote:
> > >
> > > Hi all,
> > >
> > > I posted a KIP about adding a new SCRAM configuration API on the
> broker.  Check it out here if you get a chance:
> https://cwiki.apache.org/confluence/x/ihERCQ
> > >
> > > cheers,
> > > Colin
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-598: Augment TopologyDescription with store and source / sink serde information

2020-05-03 Thread Guozhang Wang
Hello Matthias John, thanks for your comments!! Replied them inline.

I think there are a couple open questions that I'd like to hear your
opinions on with the context:

a. For stores's serdes, the reason I proposed to expose a set of serde
names instead of a pair of key / value serdes is for future possible store
types which may not be key-values. I admit it could just be over-killing
here so if you have a strong preference on the latter, I could be convinced
to change that part but I'd want to make the original motivation clear.

b. I think I'm convinced that I'd just augment the `toString` result
regardless of which func generated the Topology (and hence its
TopologyDescription), note this would mean that we break the compatibility
of the current `toString` function. As a remedy for that, we will also
expose a `toJson` function to programmatical purposes.

Guozhang


> (1) In the new TopologyDescription output, the line for the
> windowed-count processor is:
>
> >  Processor: myname (stores: [(myname-store, serdes:
[SessionWindowedSerde, FullChangeSerde])])
>
> For this case, both Serdes are wrappers and user would actually only
> specified wrapped Serdes for the key and value. Can we do anything about
> this? Otherwise, there might still be a runtime `ClassCastException`
> that a user cannot easily debug.
>
>
> (2) Nit: The JavaDocs of `Processor#storeSet()` seems to be incorrect
> (it says "The names of all connected stores." -- guess it's c&p error)?
>
Yes! Fixed.

>
> (3) The KIP mentioned to add `Store#changelogTopic()` method, but the
> output of `TopologyDescription#toString()` does not contain it. I think
> it might be good do add it, too?
>
Yes, that's right. I'm going to add to the example as well.

>
> (4) The KIP also list https://issues.apache.org/jira/browse/KAFKA-9913
> but it seems not to address it yet?
>
I actually did intent to have it addressed; the proposal includes:

a. Return the set of source / sink nodes of a sub-topology, and their
corresponding source / sink topics could be accessed.
b. Return the set of stores of a sub-topology, and their corresponding
changelog topics could be accessed.

The reason I did not choose to just expose the set of all topics directly,
but indirectly, is stated in the wiki:

"the reason we did not expose APIs for topic names directly is that for
source nodes, it is possible to have Pattern and for sink nodes, it is
possible to have topic-extractors, and hence it's better to let users
leveraging on the lower-level APIs to construct the topic names
programmatically themselves."

>
> (5) As John, I also noticed that `List Store#sedersNames()` is
> not a great API. I am not sure if I understand your reply thought.
> AFAIK, there is no exiting API
>
> > List StoreBuilder#serdes()
>
> (cf
>
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
)
>
Ah yes, that's added as part of this KIP.

>
> (6) Atm, we return `String` type for the Serdes. Do we think it's
> sufficient? Just want to double check.

The reason is that we can only get the serde-name at the time of the
topology since it may be from config and hence there's no serde object
actually.

> (10) Can we avoid coupling this KIP’s behavior to the choice of ‘build’
method? I.e., can we return the improved description even when people just
call ‘build()’?

Yes, as I replied in the above comment to yours, I've changed my mind to
just return the augmented description no matter of the function; and will
expose toJson() for future compatibilities. I've not yet updated the wiki
yet.

> Clearly, we need a placeholder if no serde is specified. How about
“unknown”, or the name of the config keys,
“default.key.serde”/“default.value.serde”?

I think if `build(props)` is used, we can use the name of the configured
values; otherwise since we do not know the config yet we'd have to use
"unknown".

> I still have some deep reservation about the ‘build(Parameters)’ method
itself. I don’t really want to side-track this conversation with all my
concerns if we can avoid it, though. It seems like justification enough
that introducing dramatically different behavior based in on seemingly
minor differences in api calls will be a source of mystery and complexity
for users.

> I.e., I’m characterizing a completely different string format as
“dramatically different”, as opposed to just having a placeholder string.

> (11) Regarding the wrapper serdes, I bet we can capture and print the
inner types as well.

Ack, I can do that.

On Sat, May 2, 2020 at 8:19 AM John Roesler  wrote:

> Hi all,
>
> I’ve been sitting on another concern about this proposal. Since Matthias
> has just submitted a few questions, perhaps I can pile on two more this
> round.
>
> (10) Can we avoid coupling this KIP’s behavior to the choice of ‘build’
> method? I.e., can we return the improved description even when people just
> call ‘build()’?
>
> Clearly, we need a placeholder if no serde is sp

Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2020-05-03 Thread Guozhang Wang
Hello John / Sophie:

With this config removed, would the assignor always tries to to achieve the
"perfect balance" (of course, it may be a sub-optimal local plateau) or
there's an internal hard-coded factor to still retain some satisfying
threshold?

Guozhang

On Sun, May 3, 2020 at 9:23 AM John Roesler  wrote:

> Hi Matthias,
>
> We originally proposed that config to allow us to skip migrating tasks if
> the current balance is “good enough”. But during implementation, we became
> concerned that supporting this option increased code complexity, and it’s
> also an extra concept for users to have to learn.
>
> To keep the new balancing system simpler both internally and externally,
> we’d like to drop it from the API for now, with the idea of adding it later
> if needed.
>
> Does that seem reasonable?
>
> Thanks,
> John
>
> On Fri, May 1, 2020, at 14:18, Matthias J. Sax wrote:
> > Can you elaborate why to remove it?
> >
> > On 5/1/20 11:29 AM, Sophie Blee-Goldman wrote:
> > > Hey all,
> > >
> > > We'd like to make a slight modification to the proposal in this KIP and
> > > remove
> > > the *balance.factor* config. We will update the KIP accordingly.
> Please let
> > > us know
> > > if you have any concerns.
> > >
> > > Cheers,
> > > Sophie
> > >
> > > On Wed, Jan 15, 2020 at 12:48 PM John Roesler 
> wrote:
> > >
> > >> Hello all,
> > >>
> > >> After a long hiatus, I've just realized that I'm now able to upgrade
> my
> > >> non-binding support to a binding +1 for KIP-441.
> > >>
> > >> This brings the vote tally to:
> > >> 3 binding +1s: Guozhang, Bill, and myself
> > >> 3 non-binding +1s: Bruno, Vinoth, and Sophie
> > >>
> > >> Since the vote has been open for at least 72 hours, the KIP is
> accepted.
> > >>
> > >> Thanks all,
> > >> -John
> > >>
> > >>
> > >>
> > >> On Mon, Oct 28, 2019 at 21:02 PM John Roesler 
> wrote:
> > >>> Hey all,
> > >>>
> > >>> Now that the 2.4 release storm is over, I'd like to bump this vote
> > >> thread.
> > >>>
> > >>> Currently, we have two binding +1s (Guozhang and Bill), and four
> > >>> non-binding ones (Bruno, Vinoth, Sophie, and myself), and no vetoes.
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Thu, Sep 12, 2019 at 12:54 PM Bill Bejeck 
> wrote:
> > 
> >  +1 (binding)
> > 
> >  On Thu, Sep 12, 2019 at 1:53 PM Sophie Blee-Goldman <
> > >> sop...@confluent.io> wrote:
> > 
> > > +1 (non-binding)
> > >
> > > On Wed, Sep 11, 2019 at 11:38 AM Vinoth Chandar <
> > >> vchan...@confluent.io> wrote:
> > >
> > >> +1 (non-binding).
> > >>
> > >> On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna  >
> > >> wrote:
> > >>
> > >>> +1 (non-binding)
> > >>>
> > >>> On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang <
> > >> wangg...@gmail.com> wrote:
> > 
> >  +1 (binding).
> > 
> >  On Thu, Sep 5, 2019 at 2:47 PM John Roesler 
> > >> wrote:
> > 
> > > Hello, all,
> > >
> > > After a great discussion, I'd like to open voting on KIP-441,
> > > to avoid long restore times in Streams after rebalancing.
> > > Please cast your votes!
> > >
> > >
> > >
> > >
> > >
> > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > >
> > > Thanks,
> > > -John
> > >
> > 
> > 
> >  --
> >  -- Guozhang
> > >>>
> > >>
> > >
> > >>>
> > >>
> > >
> >
> >
> > Attachments:
> > * signature.asc
>


-- 
-- Guozhang


Build failed in Jenkins: kafka-trunk-jdk11 #1421

2020-05-03 Thread Apache Jenkins Server
See 


Changes:

[cshapi] MINOR: Annotate KafkaAdminClientTest.testAlterClientQuotas() with @Test


--
[...truncated 3.08 MB...]
org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefau

Build failed in Jenkins: kafka-trunk-jdk8 #4496

2020-05-03 Thread Apache Jenkins Server
See 


Changes:

[cshapi] MINOR: Annotate KafkaAdminClientTest.testAlterClientQuotas() with @Test


--
[...truncated 3.06 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-03 Thread Ivan Ponomarev

Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account 
all the arguments discussed here.



Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream 




This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on 
this.


Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar 


-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
 .branch()
 .defaultBranch(result::set)
 .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:
I am moving this KIP into "inactive status". Feel free to resume the 
KIP

at any point.

If anybody else is interested in picking up this KIP, feel free to 
do so.




-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as p

Re: [DISCUSS] KIP-578: Add configuration to limit number of partitions

2020-05-03 Thread Gokul Ramanan Subramanian
Thanks Stanislav. Apologies about the long absence from this thread.

I would prefer having per-user max partition limits in a separate KIP. I
don't see this as an MVP for this KIP. I will add this as an alternative
approach into the KIP.

I was in a double mind about whether or not to impose the partition limit
for internal topics as well. I can be convinced both ways. On the one hand,
internal topics should be purely internal i.e. users of a cluster should
not have to care about them. In this sense, the partition limit should not
apply to internal topics. On the other hand, Kafka allows configuring
internal topics by specifying their replication factor etc. Therefore, they
don't feel all that internal to me. In any case, I'll modify the KIP to
exclude internal topics.

I'll also add to the KIP the alternative approach Tom suggested around
using topic policies to limit partitions, and explain why it does not help
to solve the problem that the KIP is trying to address (as I have done in a
previous correspondence on this thread).

Cheers.

On Fri, Apr 24, 2020 at 4:24 PM Stanislav Kozlovski 
wrote:

> Thanks for the KIP, Gokul!
>
> I like the overall premise - I think it's more user-friendly to have
> configs for this than to have users implement their own config policy -> so
> unless it's very complex to implement, it seems worth it.
> I agree that having the topic policy on the CreatePartitions path makes
> sense as well.
>
> Multi-tenancy was a good point. It would be interesting to see how easy it
> is to extend the max partition limit to a per-user basis. Perhaps this can
> be done in a follow-up KIP, as a natural extension of the feature.
>
> I'm wondering whether there's a need to enforce this on internal topics,
> though. Given they're internal and critical to the function of Kafka, I
> believe we'd rather always ensure they're created, regardless if over some
> user-set limit. It brings up the question of forward compatibility - what
> happens if a user's cluster is at the maximum partition capacity, yet a new
> release of Kafka introduces a new topic (e.g KIP-500)?
>
> Best,
> Stanislav
>
> On Fri, Apr 24, 2020 at 2:39 PM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
>
> > Hi Tom.
> >
> > With KIP-578, we are not trying to model the load on each partition, and
> > come up with an exact limit on what the cluster or broker can handle in
> > terms of number of partitions. We understand that not all partitions are
> > equal, and the actual load per partition varies based on the message
> size,
> > throughput, whether the broker is a leader for that partition or not etc.
> >
> > What we are trying to achieve with KIP-578 is to disallow a pathological
> > number of partitions that will surely put the cluster in bad shape. For
> > example, in KIP-578's appendix, we have described a case where we could
> not
> > delete a topic with 30k partitions, because the brokers could not
> > handle all the work that needed to be done. We have also described how
> > a producer performance test with 10k partitions observed basically 0
> > throughput. In these cases, having a limit on number of partitions
> > would allow the cluster to produce a graceful error message at topic
> > creation time, and prevent the cluster from entering a pathological
> state.
> > These are not just hypotheticals. We definitely see many of these
> > pathological cases happen in production, and we would like to avoid them.
> >
> > The actual limit on number of partitions is something we do not want to
> > suggest in the KIP. The limit will depend on various tests that owners of
> > their clusters will have to perform, including perf tests, identifying
> > topic creation / deletion times, etc. For example, the tests we did for
> the
> > KIP-578 appendix were enough to convince us that we should not have
> > anywhere close to 10k partitions on the setup we describe there.
> >
> > What we want to do with KIP-578 is provide the flexibility to set a limit
> > on number of partitions based on tests cluster owners choose to perform.
> > Cluster owners can do the tests however often they wish and dynamically
> > adjust the limit on number of partitions. For example, we found in our
> > production environment that we don't want to have more than 1k partitions
> > on an m5.large EC2 broker instances, or more than 300 partitions on a
> > t3.medium EC2 broker, for typical produce / consume use cases.
> >
> > Cluster owners are free to not configure the limit on number of
> partitions
> > if they don't want to spend the time coming up with a limit. The limit
> > defaults to INT32_MAX, which is basically infinity in this context, and
> > should be practically backwards compatible with current behavior.
> >
> > Further, the limit on number of partitions should not come in the way of
> > rebalancing tools under normal operation. For example, if the partition
> > limit per broker is set to 1k, unless the number of partitions comes
> close
> > t

Re: [DISCUSS] KIP-605 Expand Connect Worker Internal Topic Settings

2020-05-03 Thread Christopher Egerton
Hi Randall,

Thanks for the KIP! I have a few questions and suggestions but no major
objections.

1. The motivation is pretty clear for altering the various
"*.storage.replication.factor" properties to allow -1 as a value now. Are
there expected use cases for allowing modification of other properties of
these topic configs? It'd be nice to understand why we're adding this extra
configurability to the worker.

2. Should the "cleanup.policy" property have some additional guarding logic
to make sure that people don't set it to "delete" or "both"?

3. The lack of a "config.storage.partitions" property seems intentional
because the config topic should only ever have one partition. Now that
we're adding all of these other internal topic-related properties, do you
think it might be helpful to users if we emit a warning message of some
sort when they try to configure their worker with this property?

4. On the topic of compatibility--this is a fairly niche edge case, but any
time we add new configs to the worker we run the risk of overlap with
existing configs for REST extensions that users may have implemented. This
is different from other pluggable interfaces like config providers and
converters, whose properties are namespaced (presumably to avoid collisions
like this). Might be worth it to note this in a small paragraph or even
just a single sentence.

Cheers,

Chris

On Thu, Apr 30, 2020 at 4:32 PM Ryanne Dolan  wrote:

> Much needed, thanks.
>
> Ryanne
>
> On Thu, Apr 30, 2020 at 4:59 PM Randall Hauch  wrote:
>
> > Hello!
> >
> > I'd like to use this thread to discuss KIP-605, which expands some of the
> > properties that the Connect distributed worker uses when creating
> internal
> > topics:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings
> >
> > Best regards,
> >
> > Randall
> >
>


Re: [Discuss] KIP-582 Add a "continue" option for Kafka Connect error handling

2020-05-03 Thread Christopher Egerton
Hi Zihan,

I guess I'm still unclear on exactly what form this analysis might take. If
a converter has an issue (de)-serializing a record, for example, the first
thing I check out is the stack trace in the worker logs that tells me what
went wrong and where. The same goes for errors thrown during
transformation. Can we have some concrete examples about what kind analysis
performed on byte arrays in external systems might be more informative,
especially when it would either be performed without easy-to-find log
messages or require extra effort to make those log messages easy to find
and associate with the bytes in the external system?

Cheers,

Chris

On Thu, Apr 30, 2020 at 1:01 PM Zihan Li  wrote:

> Hi Chris and Andrew,
>
> Thanks a lot for your reply!
>
> I think in most cases it is easier to analysis broken records in an
> external
> system rather than in a Kafka DLQ topic. While it might be possible to
> directly analysis broken records with Kafka, people are generally more
> familiar with external tools, such as file systems and relational
> databases.
> Exporting broken records to those external systems would enable many more
> analysis tools. Users can use those tools to audit end-to-end data flow
> and
> work with upstream teams to improve data quality. As a result, in many
> cases, DLQ is consumed again by an additional connector for further
> analysis.
> So as Chris have mentioned, the point of this KIP is to save user the
> extra
> time and effort to maintain and tune this addition DLQ sink connector.
>
> The expected behavior of this new error handling option should be
> consistent
> with DLQ. Namely, if any of key, value or header is broken, the record
> should be sent to SinkTask.putBrokenRecord() instead of DLQ.
>
> Best,
> Zihan
>
> On 2020/04/25 20:05:37, Christopher Egerton  wrote:
> > Hi Zihan,
> >
> > Thanks for the changes and the clarifications! I agree that the
> complexity
> > of maintaining a second topic and a second connector is a fair amount of
> > work; to Andrew's question, it seems less about the cost of just running
> > another connector, and more about managing that second connector (and
> > topic) when a lot of the logic is identical, such as topic ACLs,
> > credentials for the connector to access the external system, and other
> > fine-tuning.
> >
> > However, I'm still curious about the general use case here. For example,
> if
> > a converter fails to deserialize a record, it seems like the right thing
> to
> > do would be to examine the record, try to understand why it's failing,
> and
> > then find a converter that can handle it. If the raw byte array for the
> > Kafka message gets written to the external system instead, what's the
> > benefit to the user? Yes, they won't have to configure another connector
> > and manage another topic, but they're still going to want to examine that
> > data at some point; why would it be easier to deal with malformed records
> > from an external system than it would from where they originally broke,
> in
> > Kafka?
> >
> > If we're going to add a new feature like this to the framework, I just
> want
> > to make sure that there's a general use case for this that isn't tied to
> > one specific type of connector, external system, usage pattern, etc.
> >
> > Oh, and one other question that came to mind--what would the expected
> > behavior be if a converter was unable to deserialize a record's key, but
> > was able to deserialize its value?
> >
> > Cheers,
> >
> > Chris
> >
> > On Sat, Apr 25, 2020 at 12:27 PM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> > > Hi Zihan,
> > > Thanks for the KIP. I have a question about the proposal.
> > >
> > > Why do you think putting a broken record somewhere other than a
> > > dead-letter topic is
> > > better? For a reliable system, you really want zero broken records, or
> > > perhaps close to zero.
> > > Broken records represent exceptions that need to be alerted and dealt
> > > with, either by
> > > fixing an incorrect application, or improving the quality of the data
> > > incoming. I wouldn't
> > > imagine a second set of connectors reading dead-letter topics storing
> the
> > > broken events
> > > elsewhere.
> > >
> > > If you did want to store them in S3, HDFS or wherever, why couldn't you
> > > run another
> > > connector off the dead-letter topic, with the ByteArrayConverter, that
> > > just bundles up
> > > the broken records as raw bytes. This seems to me very close to what
> this
> > > KIP is trying to
> > > achieve, only without needing any interface or behaviour changes in the
> > > connectors. Yes,
> > > you need to run more connectors, but in a distributed connect cluster,
> > > that's easy to achieve.
> > >
> > > Thanks,
> > > Andrew Schofield
> > > IBM
> > >
> > > On 24/04/2020, 22:00, "Zihan Li"  wrote:
> > >
> > > Hi Chris,
> > >
> > > Thanks a lot for your comments.
> > >
> > > 1. The complexity comes from maintaining an additional topic and a
> > 

Re: [DISCUSS] KIP-597: MirrorMaker2 internal topics Formatters

2020-05-03 Thread Christopher Egerton
Hi Mickael,

Thanks! This looks great, +1 non-binding.

Cheers,

Chris

On Sun, May 3, 2020 at 4:13 AM Mickael Maison 
wrote:

> Thanks Chris for the feedback.
>
> 1. I've added sample output for the 3 formatters
> 2. Good idea, instead of deleting the existing trait, let's mark it as
> deprecated and make it extend the new interface. I've updated to the
> KIP.
>
>
> I've also opened a draft PR: https://github.com/apache/kafka/pull/8604
>
> On Fri, Apr 24, 2020 at 12:17 AM Christopher Egerton
>  wrote:
> >
> > Hi Mickael,
> >
> > The KIP looks great and the additional formatters seem like excellent
> tools
> > for debugging and diving into the internals of Mirror Maker 2.0. I've
> got a
> > few thoughts on the proposed changes:
> >
> > 1. Would it be possible to provide examples for what the actual output
> > might look like when the console consumer is run with the newly-proposed
> > formatters, and the commands users could run to make that happen?
> >
> > 2. Even though the existing MessageFormatter trait is currently in the
> core
> > project, it's still mentioned in the usage for the console consumer, and
> > it's possible that users may have already implemented their own
> formatters
> > based on this information, which is part of the public API. At the same
> > time, if we do want to make this an official part of the public API for
> > Kafka, I do agree that it'd be best to make that interface available in
> the
> > clients project. What do you think about retaining the existing
> > "kafka.common.MessageFormatter" trait, but making it extend from the
> > newly-proposed "org.apache.kafka.common.MessageFormatter" interface? That
> > way, any existing tools that extend from the old MessageFormatter trait
> > will still work, and we can have our shiny new interface in the clients
> > package available for anyone who wants to roll their own in the future.
> >
> > Looking forward to your thoughts on this!
> >
> > Cheers,
> >
> > Chris
> >
> > On Thu, Apr 16, 2020 at 10:21 AM Ryanne Dolan 
> wrote:
> >
> > > Thanks Mickael, this will be very useful.
> > >
> > > Ryanne
> > >
> > > On Thu, Apr 16, 2020 at 11:44 AM Mickael Maison <
> mickael.mai...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have submitted a small KIP to provide Formatters for the
> > > > MirrorMaker2 internal topics.
> > > >
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-597%3A+MirrorMaker2+internal+topics+Formatters
> > > >
> > > > Looking forward to your feedback. Thanks
> > > >
> > >
>


Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2020-05-03 Thread John Roesler
Hi Matthias,

We originally proposed that config to allow us to skip migrating tasks if the 
current balance is “good enough”. But during implementation, we became 
concerned that supporting this option increased code complexity, and it’s also 
an extra concept for users to have to learn. 

To keep the new balancing system simpler both internally and externally, we’d 
like to drop it from the API for now, with the idea of adding it later if 
needed. 

Does that seem reasonable?

Thanks,
John

On Fri, May 1, 2020, at 14:18, Matthias J. Sax wrote:
> Can you elaborate why to remove it?
> 
> On 5/1/20 11:29 AM, Sophie Blee-Goldman wrote:
> > Hey all,
> > 
> > We'd like to make a slight modification to the proposal in this KIP and
> > remove
> > the *balance.factor* config. We will update the KIP accordingly. Please let
> > us know
> > if you have any concerns.
> > 
> > Cheers,
> > Sophie
> > 
> > On Wed, Jan 15, 2020 at 12:48 PM John Roesler  wrote:
> > 
> >> Hello all,
> >>
> >> After a long hiatus, I've just realized that I'm now able to upgrade my
> >> non-binding support to a binding +1 for KIP-441.
> >>
> >> This brings the vote tally to:
> >> 3 binding +1s: Guozhang, Bill, and myself
> >> 3 non-binding +1s: Bruno, Vinoth, and Sophie
> >>
> >> Since the vote has been open for at least 72 hours, the KIP is accepted.
> >>
> >> Thanks all,
> >> -John
> >>
> >>
> >>
> >> On Mon, Oct 28, 2019 at 21:02 PM John Roesler  wrote:
> >>> Hey all,
> >>>
> >>> Now that the 2.4 release storm is over, I'd like to bump this vote
> >> thread.
> >>>
> >>> Currently, we have two binding +1s (Guozhang and Bill), and four
> >>> non-binding ones (Bruno, Vinoth, Sophie, and myself), and no vetoes.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Thu, Sep 12, 2019 at 12:54 PM Bill Bejeck  wrote:
> 
>  +1 (binding)
> 
>  On Thu, Sep 12, 2019 at 1:53 PM Sophie Blee-Goldman <
> >> sop...@confluent.io> wrote:
> 
> > +1 (non-binding)
> >
> > On Wed, Sep 11, 2019 at 11:38 AM Vinoth Chandar <
> >> vchan...@confluent.io> wrote:
> >
> >> +1 (non-binding).
> >>
> >> On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna 
> >> wrote:
> >>
> >>> +1 (non-binding)
> >>>
> >>> On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang <
> >> wangg...@gmail.com> wrote:
> 
>  +1 (binding).
> 
>  On Thu, Sep 5, 2019 at 2:47 PM John Roesler 
> >> wrote:
> 
> > Hello, all,
> >
> > After a great discussion, I'd like to open voting on KIP-441,
> > to avoid long restore times in Streams after rebalancing.
> > Please cast your votes!
> >
> >
> >
> >
> >
> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> >
> > Thanks,
> > -John
> >
> 
> 
>  --
>  -- Guozhang
> >>>
> >>
> >
> >>>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc


Re: Permission to create a KIP

2020-05-03 Thread Guozhang Wang
Hello Aakash,

I've added you to the wiki space.

Cheers,
Guozhang

On Sat, May 2, 2020 at 10:12 PM Aakash Shah  wrote:

> Hello,
>
> I would like to request permission to create a KIP.
>
> My Wiki ID is aakash33 and my email is as...@confluent.io.
>
> Thank you!
>
> Best,
>
> Aakash Shah
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-597: MirrorMaker2 internal topics Formatters

2020-05-03 Thread Mickael Maison
Thanks Chris for the feedback.

1. I've added sample output for the 3 formatters
2. Good idea, instead of deleting the existing trait, let's mark it as
deprecated and make it extend the new interface. I've updated to the
KIP.


I've also opened a draft PR: https://github.com/apache/kafka/pull/8604

On Fri, Apr 24, 2020 at 12:17 AM Christopher Egerton
 wrote:
>
> Hi Mickael,
>
> The KIP looks great and the additional formatters seem like excellent tools
> for debugging and diving into the internals of Mirror Maker 2.0. I've got a
> few thoughts on the proposed changes:
>
> 1. Would it be possible to provide examples for what the actual output
> might look like when the console consumer is run with the newly-proposed
> formatters, and the commands users could run to make that happen?
>
> 2. Even though the existing MessageFormatter trait is currently in the core
> project, it's still mentioned in the usage for the console consumer, and
> it's possible that users may have already implemented their own formatters
> based on this information, which is part of the public API. At the same
> time, if we do want to make this an official part of the public API for
> Kafka, I do agree that it'd be best to make that interface available in the
> clients project. What do you think about retaining the existing
> "kafka.common.MessageFormatter" trait, but making it extend from the
> newly-proposed "org.apache.kafka.common.MessageFormatter" interface? That
> way, any existing tools that extend from the old MessageFormatter trait
> will still work, and we can have our shiny new interface in the clients
> package available for anyone who wants to roll their own in the future.
>
> Looking forward to your thoughts on this!
>
> Cheers,
>
> Chris
>
> On Thu, Apr 16, 2020 at 10:21 AM Ryanne Dolan  wrote:
>
> > Thanks Mickael, this will be very useful.
> >
> > Ryanne
> >
> > On Thu, Apr 16, 2020 at 11:44 AM Mickael Maison 
> > wrote:
> >
> > > Hi,
> > >
> > > I have submitted a small KIP to provide Formatters for the
> > > MirrorMaker2 internal topics.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-597%3A+MirrorMaker2+internal+topics+Formatters
> > >
> > > Looking forward to your feedback. Thanks
> > >
> >