Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Matthias J. Sax
I am not sure if I full understand, hence, I try to rephrase:

> I can't think of an example that would require both ways, or would
> even be more readable using both ways.

Example:

There are two processor A and B, and one store S that both need to
access and one store S_b that only B needs to access:

If we don't allow to mix both approaches, it would be required to write
the following code:

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // does not add any store
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
  t.addStateStore(..., "B"); // adds S_b and connect it to B

// DSL example:

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  b.addStateStore() // adds S_b
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S", "S_b") // add B and connect S and S_b


If we allow to mixes both approaches, the code could be (simplified to):

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // adds/connects S_b implicitly
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S") // add B and connect S; adds/connects S_b
implicitly

The fact that B has a "private store" could be encapsulated and I don't
see why this would be bad?

> If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.

Ie, I don't see why the second version would be confusing, or why the
first version would be more readable (I don't argue it's less readable
either though; I think both are equally readable)?



Or do you argue that we should allow the following:

> Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args

  Topology t = new Topology();
  t.addProcessor("A", ...); // adds/connects S implicitly
  t.addProceccor("B", ...); // adds/connects S and S_b implicitly

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  stream1.process(...) // add A and add/connect S implicitly
  stream2.process(...) // add B and add/connect S and S_b implicitly

For this case, the second implicit adding of S would require to return
the same `StoreBuilder` instance to make it idempotent what seems hard
to achieve, because both `ProcessorSuppliers` now have a cross
dependency to us the same object.

Hence, I don't think this would be a good approach.


Also, because we require for a unique store name to always pass the same
`StoreBuilder` instance, we have actually a good protection against user
bug that may add two stores with the same name but different builders twice.


I also do not feel super strong about it, but see some advantages to
allow the mixed approach, and don't see disadvantages. Would be good to
get input from others, too.



-Matthias


On 8/7/19 7:29 PM, Paul Whalen wrote:
> My thinking on restricting the API to enforce only one way of connecting
> stores would make it more simple to use and end up with more readable
> code.  I can't think of an example that would require both ways, or would
> even be more readable using both ways.  Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args.  If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.  I
> personally can't imagine a case in which that would be useful.
> 
> All that being said, I don't feel terribly strongly about it.  I'm just
> trying to make the API as straightforward as possible.  Admittedly a
> runtime check doesn't make for a great API, but I see it more as an
> opportunity to educate the user to make it clear that "connecting" a state
> store is a thing that can be done in two different ways, but there is no
> reason to mix both.  If it seems like there's a compelling reason to mix
> them then I would abandon the idea in a heartbeat.
> 
> Paul
> 
> On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax 
> wrote:
> 
>> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
>> distracted us somewhat.
>>
>> Overall, I am +1.
>>
>> With regard to John's point about owned vs shared state stores, I think
>> it describe a valid use case, and throwing an exception if people want
>> to mix both features might be too restrictive?
>>
>> We could of course later relax the restriction, but atm I am not sure
>> what the main argument for adding the restriction is?
>>
>> (a) In the current API, one could connect the same store multiple times
>> to the same processor without 

Re: [DISCUSS] KIP-491: Preferred Leader Deprioritized List (Temporary Blacklist)

2019-08-07 Thread George Li
 Hi Colin,

> In your example, I think we're comparing apples and oranges.  You started by 
> outlining a scenario where "an empty broker... comes up... [without] any > 
> leadership[s]."  But then you criticize using reassignment to switch the 
> order of preferred replicas because it "would not actually switch the leader 
> > automatically."  If the empty broker doesn't have any leaderships, there is 
> nothing to be switched, right?

Let me explained in details of this particular use case example for comparing 
apples to apples. 

Let's say a healthy broker hosting 3000 partitions, and of which 1000 are the 
preferred leaders (leader count is 1000). There is a hardware failure 
(disk/memory, etc.), and kafka process crashed. We swap this host with another 
host but keep the same broker.id, when this new broker coming up, it has no 
historical data, and we manage to have the current last offsets of all 
partitions set in the replication-offset-checkpoint (if we don't set them, it 
could cause crazy ReplicaFetcher pulling of historical data from other brokers 
and cause cluster high latency and other instabilities), so when Kafka is 
brought up, it is quickly catching up as followers in the ISR.  Note, we have 
auto.leader.rebalance.enable  disabled, so it's not serving any traffic as 
leaders (leader count = 0), even there are 1000 partitions that this broker is 
the Preferred Leader. 

We need to make this broker not serving traffic for a few hours or days 
depending on the SLA of the topic retention requirement until after it's having 
enough historical data. 


* The traditional way using the reassignments to move this broker in that 1000 
partitions where it's the preferred leader to the end of  assignment, this is 
O(N) operation. and from my experience, we can't submit all 1000 at the same 
time, otherwise cause higher latencies even the reassignment in this case can 
complete almost instantly.  After  a few hours/days whatever, this broker is 
ready to serve traffic,  we have to run reassignments again to restore that 
1000 partitions preferred leaders for this broker: O(N) operation.  then run 
preferred leader election O(N) again.  So total 3 x O(N) operations.  The point 
is since the new empty broker is expected to be the same as the old one in 
terms of hosting partition/leaders, it would seem unnecessary to do 
reassignments (ordering of replica) during the broker catching up time. 



* The new feature Preferred Leader "Blacklist":  just need to put a dynamic 
config to indicate that this broker should be considered leader (preferred 
leader election or broker failover or unclean leader election) to the lowest 
priority. NO need to run any reassignments. After a few hours/days, when this 
broker is ready, remove the dynamic config, and run preferred leader election 
and this broker will serve traffic for that 1000 original partitions it was the 
preferred leader. So total  1 x O(N) operation. 


If auto.leader.rebalance.enable  is enabled,  the Preferred Leader "Blacklist" 
can be put it before Kafka is started to prevent this broker serving traffic.  
In the traditional way of running reassignments, once the broker is up, with 
auto.leader.rebalance.enable  , if leadership starts going to this new empty 
broker, it might have to do preferred leader election after reassignments to 
remove its leaderships. e.g. (1,2,3) => (2,3,1) reassignment only change the 
ordering, 1 remains as the current leader, and needs prefer leader election to 
change to 2 after reassignment. so potentially one more O(N) operation. 

I hope the above example can show how easy to "blacklist" a broker serving 
leadership.  For someone managing Production Kafka cluster, it's important to 
react fast to certain alerts and mitigate/resolve some issues. As I listed the 
other use cases in KIP-291, I think this feature can make the Kafka product 
more easier to manage/operate. 

> In general, using an external rebalancing tool like Cruise Control is a good 
> idea to keep things balanced without having deal with manual rebalancing.  > 
> We expect more and more people who have a complex or large cluster will start 
> using tools like this.
> 
> However, if you choose to do manual rebalancing, it shouldn't be that bad.  
> You would save the existing partition ordering before making your changes, 
> then> make your changes (perhaps by running a simple command line tool that 
> switches the order of the replicas).  Then, once you felt like the broker was 
> ready to> serve traffic, you could just re-apply the old ordering which you 
> had saved.


We do have our own rebalancing tool which has its own criteria like Rack 
diversity,  disk usage,  spread partitions/leaders across all brokers in the 
cluster per topic, leadership Bytes/BytesIn served per broker, etc.  We can run 
reassignments. The point is whether it's really necessary, and if there is more 
effective, easier, safer way to do it.    

take another use case 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Paul Whalen
My thinking on restricting the API to enforce only one way of connecting
stores would make it more simple to use and end up with more readable
code.  I can't think of an example that would require both ways, or would
even be more readable using both ways.  Shared stores can be passed from
the outside in an anonymous ProcessorSupplier if desired, making it
effectively the same as passing the stateStoreNames var args.  If you can
do both ways, the actual full set of state stores being connected could be
in wildly different places in the code, which could create confusion.  I
personally can't imagine a case in which that would be useful.

All that being said, I don't feel terribly strongly about it.  I'm just
trying to make the API as straightforward as possible.  Admittedly a
runtime check doesn't make for a great API, but I see it more as an
opportunity to educate the user to make it clear that "connecting" a state
store is a thing that can be done in two different ways, but there is no
reason to mix both.  If it seems like there's a compelling reason to mix
them then I would abandon the idea in a heartbeat.

Paul

On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax 
wrote:

> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
> distracted us somewhat.
>
> Overall, I am +1.
>
> With regard to John's point about owned vs shared state stores, I think
> it describe a valid use case, and throwing an exception if people want
> to mix both features might be too restrictive?
>
> We could of course later relax the restriction, but atm I am not sure
> what the main argument for adding the restriction is?
>
> (a) In the current API, one could connect the same store multiple times
> to the same processor without getting an exception, because the
> operation is idempotent.
>
> (b) The KIP also suggest to relax the current restriction to add the
> same store twice, as long as store name and `StoreBuilder` instance are
> the same, because it's an idempotent (hence, safe) operation too.
>
> Because we have already (a) and (b) and consider both as safe, it seems
> we could also treat the case of mixing both patterns as idempotent and
> hence safe. And if we do this, we enable to mix both patterns for
> different stores implicitly.
>
>
> Thoughts?
>
>
> -Matthias
>
>
>
> On 6/17/19 2:31 PM, John Roesler wrote:
> > Hey, all,
> >
> > Sorry I'm late to the party. I meant to read into this KIP before, but
> > didn't get around to it. I was just reminded when Paul mentioned it in
> > a different thread. Please feel free to bump a discussion any time it
> > stalls!
> >
> > I've just read through the whole discussion so far, and, to echo the
> > earlier sentiments, the motivation seems very clear. I remember how
> > hard it was to figure out how to actually wire up a stateful processor
> > properly the first couple of times. Not a very good user experience.
> >
> > I looked over the whole conversation to date, as well as the KIP and
> > the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
> > current approach looks good to me. I was concerned about the "cheat
> > codes"-style mixin interface. Discoverability would have been a
> > problem, and it's also not a very normal pattern for Java APIs. It
> > actually looks a little more like something you'd do with an
> > annotation.
> >
> > So the current approach seems good:
> > * The new interface with a default to return `null` is effectively
> > shipping the feature flagged "off" (which is nice and safe)
> > * Shared stores are "supported" the same way they always have been, by
> > connecting them externally. This makes sense, since those stores
> > aren't "owned" by any of the connected processors.
> > * Processors that do own their stores can configure them in the same
> > file they use them, which decreases the probability of cast exceptions
> > when they get the stores from the context.
> > * Stateful processors that own their stores are available for one-shot
> > definition of the stores and the processor all in the same file (this
> > is the main point of the KIP)
> >
> > The runtime check that stores can't be both defined in the processor
> > and referenced by name might be a little restrictive (since we already
> > have the restriction that same-name stores can't be registered), but
> > it would also be easy to remove it later. I'm just thinking that if I
> > have a processor that owns one store and shares another, it would be
> > pretty obvious how to hook it up in the proposed API, except for that
> > check.
> >
> > One last thought, regarding the all-important interface name: If you
> > wanted to indicate more that the stores are available for Streams to
> > connect, rather than that they are already connected, you could call
> > it ConnectableStoreProvider (similar to AutoCloseable).
> >
> > I just thought I'd summarize the current state, since it's been a
> > while and no one has voted yet. I'll go ahead and vote now on the
> > voting thread, 

Build failed in Jenkins: kafka-2.0-jdk8 #286

2019-08-07 Thread Apache Jenkins Server
See 


Changes:

[matthias] KAFKA-8602: Backport bugfix for standby task creation (#7146)

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H50 (ubuntu bionic) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.0^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.0^{commit} # timeout=10
Checking out Revision 79eaddd25f8d030f18618b367f608b80f5862ea5 
(refs/remotes/origin/2.0)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 79eaddd25f8d030f18618b367f608b80f5862ea5
Commit message: "KAFKA-8602: Backport bugfix for standby task creation (#7146)"
 > git rev-list --no-walk 57352bb163ca54c8ff257300632cd28a8a35900f # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.0-jdk8] $ /bin/bash -xe /tmp/jenkins3630615610596436902.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.8.1/bin/gradle
/tmp/jenkins3630615610596436902.sh: line 4: 
/home/jenkins/tools/gradle/4.8.1/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=79eaddd25f8d030f18618b367f608b80f5862ea5, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #281
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user b...@confluent.io


Re: [DISCUSS] KIP-491: Preferred Leader Deprioritized List (Temporary Blacklist)

2019-08-07 Thread Colin McCabe
On Wed, Aug 7, 2019, at 12:48, George Li wrote:
>  Hi Colin,
> 
> Thanks for your feedbacks.  Comments below:
> > Even if you have a way of blacklisting an entire broker all at once, you 
> >still would need to run a leader election > for each partition where you 
> >want to move the leader off of the blacklisted broker.  So the operation is 
> >still O(N) in > that sense-- you have to do something per partition.
> 
> For a failed broker and swapped with an empty broker, when it comes up, 
> it will not have any leadership, and we would like it to remain not 
> having leaderships for a couple of hours or days. So there is no 
> preferred leader election needed which incurs O(N) operation in this 
> case.  Putting the preferred leader blacklist would safe guard this 
> broker serving traffic during that time. otherwise, if another broker 
> fails(if this broker is the 1st, 2nd in the assignment), or someone 
> runs preferred leader election, this new "empty" broker can still get 
> leaderships. 
> 
> Also running reassignment to change the ordering of preferred leader 
> would not actually switch the leader automatically.  e.g.  (1,2,3) => 
> (2,3,1). unless preferred leader election is run to switch current 
> leader from 1 to 2.  So the operation is at least 2 x O(N).  and then 
> after the broker is back to normal, another 2 x O(N) to rollback. 

Hi George,

Hmm.  I guess I'm still on the fence about this feature.

In your example, I think we're comparing apples and oranges.  You started by 
outlining a scenario where "an empty broker... comes up... [without] any 
leadership[s]."  But then you criticize using reassignment to switch the order 
of preferred replicas because it "would not actually switch the leader 
automatically."  If the empty broker doesn't have any leaderships, there is 
nothing to be switched, right?

> 
> 
> > In general, reassignment will get a lot easier and quicker once KIP-455 is 
> > implemented.  > Reassignments that just change the order of preferred 
> > replicas for a specific partition should complete pretty much instantly.
> >> I think it's simpler and easier just to have one source of truth for what 
> >> the preferred replica is for a partition, rather than two.  So for> me, 
> >> the fact that the replica assignment ordering isn't changed is actually a 
> >> big disadvantage of this KIP.  If you are a new user (or just>  an 
> >> existing user that didn't read all of the documentation) and you just look 
> >> at the replica assignment, you might be confused by why> a particular 
> >> broker wasn't getting any leaderships, even  though it appeared like it 
> >> should.  More mechanisms mean more complexity> for users and developers 
> >> most of the time.
> 
> 
> I would like stress the point that running reassignment to change the 
> ordering of the replica (putting a broker to the end of partition 
> assignment) is unnecessary, because after some time the broker is 
> caught up, it can start serving traffic and then need to run 
> reassignments again to "rollback" to previous states. As I mentioned in 
> KIP-491, this is just tedious work. 

In general, using an external rebalancing tool like Cruise Control is a good 
idea to keep things balanced without having deal with manual rebalancing.  We 
expect more and more people who have a complex or large cluster will start 
using tools like this.

However, if you choose to do manual rebalancing, it shouldn't be that bad.  You 
would save the existing partition ordering before making your changes, then 
make your changes (perhaps by running a simple command line tool that switches 
the order of the replicas).  Then, once you felt like the broker was ready to 
serve traffic, you could just re-apply the old ordering which you had saved.

> 
> I agree this might introduce some complexities for users/developers. 
> But if this feature is good, and well documented, it is good for the 
> kafka product/community.  Just like KIP-460 enabling unclean leader 
> election to override TopicLevel/Broker Level config of 
> `unclean.leader.election.enable`
> 
> > I agree that it would be nice if we could treat some brokers differently 
> > for the purposes of placing replicas, selecting leaders, etc. > Right now, 
> > we don't have any way of implementing that without forking the broker.  I 
> > would support a new PlacementPolicy class that> would close this gap.  But 
> > I don't think this KIP is flexible enough to fill this role.  For example, 
> > it can't prevent users from creating> new single-replica topics that get 
> > put on the "bad" replica.  Perhaps we should reopen the discussion> about 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-201%3A+Rationalising+Policy+interfaces
> 
> Creating topic with single-replica is beyond what KIP-491 is trying to 
> achieve.  The user needs to take responsibility of doing that. I do see 
> some Samza clients notoriously creating single-replica topics and that 
> got flagged by 

Re: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-07 Thread Colin McCabe
On Wed, Aug 7, 2019, at 15:41, George Li wrote:
> This email seemed to get lost in the dev email server.  Resending. 
> 
> 
> On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li 
>  wrote:
> 
> 
> The pending reassignments partitions would be reported as URP (Under 
> Replicated Partitions).  or maybe reported as a separate metrics of 
> RURP (Reassignment URP) since now we can derived from the new 
> AddingReplicas. An alert could be triggered based on this. 
> 

Hi George,

I agree that this would be a great idea for follow up work.  Check out KIP-352, 
which discusses creating a such a metric. :)

> 
> 
> It would be nice if ListPartitionReassignmentResult could return the 
> "elapsed time/duration" of the current pending reassignments, the 
> calling client can flag those current long running reassignments and 
> alert.  However, what I would be interested is probably the total # of 
> pending reassignments because I will submit reassignments in batches, 
> e.g. 50 reassignments per batch.  If the pending reassignments # is 
> below that per batch #, submit more new reassignments = (per_batch_# - 
> pending_#).
> 

It is definitely useful to know what reassignments exist.  If you call 
ListPartitionReassignments, you can count how many results you get, in order to 
implement a policy like that.

I'm not sure if knowing how long reassignments have been in progress will be 
important or not.  I think we should give people some time to try out the new 
APIs and see what could be improved based on their experience.

> 
> 
> It seems currently, the ReplicaFetcher threads could quite easily crash 
> because of some exceptions. e.g. Java Out Of Memory, and would just 
> remain dead (jstack to dump threads to check the # of running 
> ReplicaFetcher threads) without getting restarted automatically, so 
> needs to bounce the broker.  It would be nice to make the 
> ReplicaFetcher more robust/resilient of catching more exceptions, and 
> if crashed, get restarted after some time. 
> 

This has definitely been an issue in the past, I agree.  Thankfully, we 
recently did improve the robustness of the ReplicaFetcher.  Check out "KIP-461: 
Improve Replica Fetcher behavior at handling partition failure."

cheers,
Colin

> 
> 
> Thanks,
> 
> George
> 
> 
> 
> On 2019/08/06 23:07:19, "Colin McCabe"  wrote: 
> > Hi Koushik,
> > 
> > Thanks for the idea.  This KIP is already pretty big, so I think we'll have 
> > to consider ideas like this in follow-on KIPs.
> > 
> > In general, figuring out what's wrong with replication is a pretty tough 
> > problem.  If we had an API for this, we'd probably want it to be unified, 
> > and not specific to reassigning partitions.
> > 
> > regards,
> > Colin
> > 
> > 
> > On Tue, Aug 6, 2019, at 10:57, Koushik Chitta wrote:
> > > Hey Colin,
> > > 
> > > Can the ListPartitionReassignmentsResult include the status of the 
> > > current reassignment progress of each partition? A reassignment can be 
> > > in progress for different reasons and the status can give the option to 
> > > alter the current reassignment.
> > > 
> > > Example -  A leaderISRRequest of a new assigned replicas can be 
> > > ignored/errored because of a storage exception.  And reassignment batch 
> > > will be waiting indefinitely for the new assigned replicas to be in 
> > > sync with the leader of the partition.  
> > > Showing the status will give an option to alter the affected 
> > > partitions and allow the batch to complete reassignment.
> > > 
> > > OAR = {1, 2, 3} and RAR = {4,5,6}
> > > 
> > >  AR leader/isr
> > > {1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest 
> > > was lost/skipped for 5 and the reassignment operation will be waiting 
> > > indefinitely for the 5 to be insync.
> > > 
> > > 
> > > 
> > > Thanks,
> > > Koushik
> > > 
> > > -Original Message-
> > > From: Jun Rao  
> > > Sent: Friday, August 2, 2019 10:04 AM
> > > To: dev 
> > > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> > > Reassignment
> > > 
> > > Hi, Colin,
> > > 
> > > First, since we are changing the format of LeaderAndIsrRequest, which 
> > > is an inter broker request, it seems that we will need IBP during 
> > > rolling upgrade. Could we add that to the compatibility section?
> > > 
> > > Regarding UnsupportedVersionException, even without ZK node version 
> > > bump, we probably want to only use the new ZK value fields after all 
> > > brokers have been upgraded to the new binary. Otherwise, the 
> > > reassignment task may not be completed if the controller changes to a 
> > > broker still on the old binary.
> > > IBP is one way to achieve that. The main thing is that we need some way 
> > > for the controller to deal with the new ZK fields. Dealing with the 
> > > additional ZK node version bump seems a small thing on top of that?
> > > 
> > > Thanks,
> > > 
> > > Jun
> > > 
> > > On Thu, Aug 1, 2019 at 3:05 PM Colin McCabe  wrote:
> > > 
> 

Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-07 Thread Jason Gustafson
Hi All,

Since KIP-455 is passed, I would like to revive this proposal. I have
reduced the scope so that it is just changing the way we compute URP and
adding a new metric for the number of reassigning partitions. Please take a
look:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
.

Thanks,
Jason

On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:

> Hey Jason,
>
> Yes, I also think the right solution is probably to include the
> expected_replication_factor in the per-topic znode and include this
> information in the UpdateMetadataRequest.
>
> And I agree with Gwen and Ismael that it is reasonable to redefine URP as
> those partitions whose ISR set size < expected replication factor. Given
> that URP is being used for monitoring cluster availability and and
> reassignment progress, it seems that one of them will break depending on
> the URP definition. It seems better to keep the legitimate use-case, i.e.
> monitoring cluster availability. Users who want to monitor reassignment
> progress probably should use the "kafka-reassign-partitions.sh --verify".
>
>
> Thanks,
> Dong
>
>
>
> On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson 
> wrote:
>
> > @Dong
> >
> > Ugh, you are right. This is indeed trickier than I imagined. You could
> > argue that the problem here is that there is no single source of truth
> for
> > the expected replication factor. While a reassignment is in progress, the
> > reassignment itself has the expected replica set. Otherwise, it is stored
> > partition metadata itself. This is why manually deleting the reassignment
> > is problematic. We lose the expected replica set and we depend on users
> to
> > reinstall it. I guess I'm starting to think that the way we track
> > reassignment state in the controller is problematic. In addition to the
> > problems caused by deletion, we cannot easily change an existing
> > reassignment.
> >
> > High level what I'm thinking is that we need to move the pending
> > reassignment state out of the single znode and into the individual
> metadata
> > of the reassigned partitions so that there is a single source of truth
> for
> > the expected replicas (and hence the replication factor). This would also
> > give us an easier mechanism to manage the batching of multiple
> > reassignments. Let me think on it a bit and see if I can come up with a
> > proposal.
> >
> > @Gwen, @Ismael
> >
> > That is fair. I also prefer to redefine URP if we think the compatibility
> > impact is a lower concern than continued misuse.
> >
> > -Jason
> >
> >
> >
> > On Fri, Aug 3, 2018 at 12:25 PM, Ismael Juma  wrote:
> >
> > > RIght, that was my thinking too.
> > >
> > > Ismael
> > >
> > > On Fri, Aug 3, 2018 at 12:04 PM Gwen Shapira 
> wrote:
> > >
> > > > On Fri, Aug 3, 2018 at 11:23 AM, Jason Gustafson  >
> > > > wrote:
> > > >
> > > > > Hey Ismael,
> > > > >
> > > > > Yeah, my initial inclination was to redefine URP as well. My only
> > doubt
> > > > was
> > > > > how it would affect existing tools which might depend on URPs to
> > track
> > > > the
> > > > > progress of a reassignment. I decided to be conservative in the
> end,
> > > but
> > > > > I'd reconsider if we think it is not a major concern. It is
> annoying
> > to
> > > > > need a new category.
> > > > >
> > > >
> > > > There are existing tools that use URP to track reassignment, but
> there
> > > are
> > > > many more tools that use URP for monitoring and alerting. If I
> > understand
> > > > Ismael's suggestion correctly, a re-definition will improve the
> > > reliability
> > > > of the monitoring tools (since there won't be false alerts in case of
> > > > re-assignment) without having to switch to a new metric.
> > > >
> > > > I think we should choose the proposal that improves the more common
> > usage
> > > > of the metric, in this case, failure monitoring rather than
> > reassignment.
> > > >
> > > >
> > > > >
> > > > > About your question about storage in ZK, I can't think of anything
> > > > > additional that we need. Probably the main difficulty is getting
> > access
> > > > to
> > > > > the replication factor in the topic utility. My basic thought was
> > just
> > > to
> > > > > collect the URPs (as we know them today) and use the config API to
> > > > > partition them based on the replication factor. Do you see any
> > problems
> > > > > with this?
> > > > >
> > > > > -Jason
> > > > >
> > > > >
> > > > > On Thu, Aug 2, 2018 at 12:14 PM, Ismael Juma 
> > > wrote:
> > > > >
> > > > > > Thanks Jason. This is definitely a pain point. I actually prefer
> > the
> > > > > option
> > > > > > to redefine what under-replicated means (currently under rejected
> > > > > > alternatives). Also, do we need to make changes to what we store
> in
> > > ZK?
> > > > > If
> > > > > > so, that should be in the KIP too.
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Thu, Aug 2, 2018 at 11:45 AM Jason Gustafson <
> > ja...@confluent.io>
> > > > > > wrote:
> > > > > >

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Matthias J. Sax
Sorry for the long silence on this KIP Paul! I guess the 2.3 release
distracted us somewhat.

Overall, I am +1.

With regard to John's point about owned vs shared state stores, I think
it describe a valid use case, and throwing an exception if people want
to mix both features might be too restrictive?

We could of course later relax the restriction, but atm I am not sure
what the main argument for adding the restriction is?

(a) In the current API, one could connect the same store multiple times
to the same processor without getting an exception, because the
operation is idempotent.

(b) The KIP also suggest to relax the current restriction to add the
same store twice, as long as store name and `StoreBuilder` instance are
the same, because it's an idempotent (hence, safe) operation too.

Because we have already (a) and (b) and consider both as safe, it seems
we could also treat the case of mixing both patterns as idempotent and
hence safe. And if we do this, we enable to mix both patterns for
different stores implicitly.


Thoughts?


-Matthias



On 6/17/19 2:31 PM, John Roesler wrote:
> Hey, all,
> 
> Sorry I'm late to the party. I meant to read into this KIP before, but
> didn't get around to it. I was just reminded when Paul mentioned it in
> a different thread. Please feel free to bump a discussion any time it
> stalls!
> 
> I've just read through the whole discussion so far, and, to echo the
> earlier sentiments, the motivation seems very clear. I remember how
> hard it was to figure out how to actually wire up a stateful processor
> properly the first couple of times. Not a very good user experience.
> 
> I looked over the whole conversation to date, as well as the KIP and
> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
> current approach looks good to me. I was concerned about the "cheat
> codes"-style mixin interface. Discoverability would have been a
> problem, and it's also not a very normal pattern for Java APIs. It
> actually looks a little more like something you'd do with an
> annotation.
> 
> So the current approach seems good:
> * The new interface with a default to return `null` is effectively
> shipping the feature flagged "off" (which is nice and safe)
> * Shared stores are "supported" the same way they always have been, by
> connecting them externally. This makes sense, since those stores
> aren't "owned" by any of the connected processors.
> * Processors that do own their stores can configure them in the same
> file they use them, which decreases the probability of cast exceptions
> when they get the stores from the context.
> * Stateful processors that own their stores are available for one-shot
> definition of the stores and the processor all in the same file (this
> is the main point of the KIP)
> 
> The runtime check that stores can't be both defined in the processor
> and referenced by name might be a little restrictive (since we already
> have the restriction that same-name stores can't be registered), but
> it would also be easy to remove it later. I'm just thinking that if I
> have a processor that owns one store and shares another, it would be
> pretty obvious how to hook it up in the proposed API, except for that
> check.
> 
> One last thought, regarding the all-important interface name: If you
> wanted to indicate more that the stores are available for Streams to
> connect, rather than that they are already connected, you could call
> it ConnectableStoreProvider (similar to AutoCloseable).
> 
> I just thought I'd summarize the current state, since it's been a
> while and no one has voted yet. I'll go ahead and vote now on the
> voting thread, since I'm +1 on the current proposal.
> 
> Thanks,
> -John
> 
> On Mon, May 27, 2019 at 1:59 PM Paul Whalen  wrote:
>>
>> It wasn't much of a lift changing option B to work for option C, so I
>> closed that PR and made a new one, which should be identical to the KIP
>> right now: https://github.com/apache/kafka/pull/6824.  There are a few
>> todos still which I will hold off until the KIP is accepted.
>>
>> I created a voting thread about a month ago, so I'll bump that now that
>> we're nearly there.
>>
>> Paul
>>
>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:
>>
>>> Per Matthias's suggestion from a while ago, I actually implemented a good
>>> amount of option B to get a sense of the user experience and documentation
>>> requirements.  For a few reasons mentioned below, I think it's not my
>>> favorite option, and I prefer option C.  But since I did the work and it
>>> can help discussion, I may as well share:
>>> https://github.com/apache/kafka/pull/6821.
>>>
>>> Things I learned along the way implementing Option B:
>>>  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
>>> perfect but it seems to capture the general gist without being overly
>>> verbose.  I get that from a strict standpoint it's not "providing connected
>>> stores" but is instead "providing stores to be 

Re: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-07 Thread George Li
This email seemed to get lost in the dev email server.  Resending. 


On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li 
 wrote:


The pending reassignments partitions would be reported as URP (Under Replicated 
Partitions).  or maybe reported as a separate metrics of RURP (Reassignment 
URP) since now we can derived from the new AddingReplicas. An alert could be 
triggered based on this. 



It would be nice if ListPartitionReassignmentResult could return the "elapsed 
time/duration" of the current pending reassignments, the calling client can 
flag those current long running reassignments and alert.  However, what I would 
be interested is probably the total # of pending reassignments because I will 
submit reassignments in batches, e.g. 50 reassignments per batch.  If the 
pending reassignments # is below that per batch #, submit more new 
reassignments = (per_batch_# - pending_#).



It seems currently, the ReplicaFetcher threads could quite easily crash because 
of some exceptions. e.g. Java Out Of Memory, and would just remain dead (jstack 
to dump threads to check the # of running ReplicaFetcher threads) without 
getting restarted automatically, so needs to bounce the broker.  It would be 
nice to make the ReplicaFetcher more robust/resilient of catching more 
exceptions, and if crashed, get restarted after some time. 



Thanks,

George



On 2019/08/06 23:07:19, "Colin McCabe"  wrote: 
> Hi Koushik,
> 
> Thanks for the idea.  This KIP is already pretty big, so I think we'll have 
> to consider ideas like this in follow-on KIPs.
> 
> In general, figuring out what's wrong with replication is a pretty tough 
> problem.  If we had an API for this, we'd probably want it to be unified, and 
> not specific to reassigning partitions.
> 
> regards,
> Colin
> 
> 
> On Tue, Aug 6, 2019, at 10:57, Koushik Chitta wrote:
> > Hey Colin,
> > 
> > Can the ListPartitionReassignmentsResult include the status of the 
> > current reassignment progress of each partition? A reassignment can be 
> > in progress for different reasons and the status can give the option to 
> > alter the current reassignment.
> > 
> > Example -  A leaderISRRequest of a new assigned replicas can be 
> > ignored/errored because of a storage exception.  And reassignment batch 
> > will be waiting indefinitely for the new assigned replicas to be in 
> > sync with the leader of the partition.  
> >   Showing the status will give an option to alter the affected 
> > partitions and allow the batch to complete reassignment.
> > 
> > OAR = {1, 2, 3} and RAR = {4,5,6}
> > 
> >  AR leader/isr
> > {1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest 
> > was lost/skipped for 5 and the reassignment operation will be waiting 
> > indefinitely for the 5 to be insync.
> > 
> > 
> > 
> > Thanks,
> > Koushik
> > 
> > -Original Message-
> > From: Jun Rao  
> > Sent: Friday, August 2, 2019 10:04 AM
> > To: dev 
> > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> > Reassignment
> > 
> > Hi, Colin,
> > 
> > First, since we are changing the format of LeaderAndIsrRequest, which 
> > is an inter broker request, it seems that we will need IBP during 
> > rolling upgrade. Could we add that to the compatibility section?
> > 
> > Regarding UnsupportedVersionException, even without ZK node version 
> > bump, we probably want to only use the new ZK value fields after all 
> > brokers have been upgraded to the new binary. Otherwise, the 
> > reassignment task may not be completed if the controller changes to a 
> > broker still on the old binary.
> > IBP is one way to achieve that. The main thing is that we need some way 
> > for the controller to deal with the new ZK fields. Dealing with the 
> > additional ZK node version bump seems a small thing on top of that?
> > 
> > Thanks,
> > 
> > Jun
> > 
> > On Thu, Aug 1, 2019 at 3:05 PM Colin McCabe  wrote:
> > 
> > > On Thu, Aug 1, 2019, at 12:00, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > 10. Sounds good.
> > > >
> > > > 13. Our current convention is to bump up the version of ZK value if 
> > > > there is any format change. For example, we have bumped up the 
> > > > version of the value in /brokers/ids/nnn multiple times and all of 
> > > > those changes are compatible (just adding new fields). This has the 
> > > > slight benefit that it makes it clear there is a format change. 
> > > > Rolling upgrades and downgrades can still be supported with the 
> > > > version bump. For example, if you
> > > downgrade
> > > > from a compatible change, you can leave the new format in ZK and the 
> > > > old code will only pick up fields relevant to the old version. 
> > > > Upgrade will
> > > be
> > > > controlled by inter broker protocol.
> > >
> > > Hmm.  If we bump that ZK node version, we will need a new inter-broker 
> > > protocol version.  We also need to return UnsupportedVersionException 
> > > from the alterPartitionReassignments and 

[jira] [Created] (KAFKA-8767) Optimize StickyAssignor for Cooperative mode

2019-08-07 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8767:
--

 Summary: Optimize StickyAssignor for Cooperative mode
 Key: KAFKA-8767
 URL: https://issues.apache.org/jira/browse/KAFKA-8767
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sophie Blee-Goldman


In some rare cases, the StickyAssignor will fail to balance an assignment 
without violating stickiness despite a balanced and sticky assignment being 
possible. The implications of this for cooperative rebalancing are that an 
unnecessary additional rebalance will be triggered.

This was seen to happen for example when each consumer is subscribed to some 
random subset of all topics and all their subscriptions change to a different 
random subset, as occurs in 
AbstractStickyAssignorTest#testReassignmentWithRandomSubscriptionsAndChanges.

The initial assignment after the random subscription change obviously involved 
migrating partitions, so following the cooperative protocol those partitions 
are removed from the balanced first assignment, and a second rebalance is 
triggered. In some cases, during the second rebalance the assignor was unable 
to reach a balanced assignment without migrating a few partitions, even though 
one must have been possible (since the first assignment was balanced). A third 
rebalance was needed to reach a stable balanced state.

Under the conditions in the previously mentioned test (between 20-40 consumers, 
10-20 topics (with 0-20 partitions) this third rebalance was required roughly 
30% of the time. Some initial improvements to the sticky assignment logic 
reduced this to under 15%, but we should consider closing this gap and 
optimizing the cooperative sticky assignment

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-07 Thread Jason Gustafson
Thanks for the KIP. This is useful. The controller also maintains a set for
topics which are awaiting deletion, but currently ineligible. A topic which
is undergoing reassignment, for example, is ineligible for deletion. Would
it make sense to have a metric for this as well?

-Jason

On Wed, Aug 7, 2019 at 1:52 PM David Arthur  wrote:

> Updated the KIP with a count of replicas awaiting deletion.
>
> On Wed, Aug 7, 2019 at 9:37 AM David Arthur  wrote:
>
> > Thanks for the feedback, Stan. That's a good point about the partition
> > count -- I'll poke around and see if I can surface this value in the
> > Controller.
> >
> > On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski <
> stanis...@confluent.io>
> > wrote:
> >
> >> Thanks for the KIP David,
> >>
> >> As you mentioned in the KIP - "when a large number of topics
> (partitions,
> >> really) are deleted at once, it can take significant time for the
> >> Controller to process everything.
> >> In that sense, does it make sense to have the metric expose the number
> of
> >> partitions that are pending deletion, as opposed to topics? Perhaps even
> >> both?
> >> My reasoning is that this metric alone wouldn't say much if we had one
> >> topic with 1000 partitions versus a topic with 1 partition
> >>
> >> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani 
> >> wrote:
> >>
> >> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> >> > -Harsha
> >> >
> >> >
> >> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur <
> davidart...@apache.org>
> >> > wrote:
> >> >
> >> > > Hello all, I'd like to start a discussion for
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/
> >> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> >> > >
> >> > > Thanks!
> >> > > David
> >> > >
> >> >
> >>
> >>
> >> --
> >> Best,
> >> Stanislav
> >>
> >
> >
> > --
> > David Arthur
> >
>
>
> --
> David Arthur
>


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-07 Thread David Arthur
Updated the KIP with a count of replicas awaiting deletion.

On Wed, Aug 7, 2019 at 9:37 AM David Arthur  wrote:

> Thanks for the feedback, Stan. That's a good point about the partition
> count -- I'll poke around and see if I can surface this value in the
> Controller.
>
> On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski 
> wrote:
>
>> Thanks for the KIP David,
>>
>> As you mentioned in the KIP - "when a large number of topics (partitions,
>> really) are deleted at once, it can take significant time for the
>> Controller to process everything.
>> In that sense, does it make sense to have the metric expose the number of
>> partitions that are pending deletion, as opposed to topics? Perhaps even
>> both?
>> My reasoning is that this metric alone wouldn't say much if we had one
>> topic with 1000 partitions versus a topic with 1 partition
>>
>> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani 
>> wrote:
>>
>> > Thanks for the KIP.  Its useful metric to have.  LGTM.
>> > -Harsha
>> >
>> >
>> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur 
>> > wrote:
>> >
>> > > Hello all, I'd like to start a discussion for
>> > > https://cwiki.apache.org/confluence/display/KAFKA/
>> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
>> > >
>> > > Thanks!
>> > > David
>> > >
>> >
>>
>>
>> --
>> Best,
>> Stanislav
>>
>
>
> --
> David Arthur
>


-- 
David Arthur


[jira] [Created] (KAFKA-8766) Allow a custom offset policy for Kafka Streams applications

2019-08-07 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8766:
-

 Summary: Allow a custom offset policy for Kafka Streams 
applications 
 Key: KAFKA-8766
 URL: https://issues.apache.org/jira/browse/KAFKA-8766
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Patrik Kleindl


Currently when starting a new streams application (= new consumer group) you 
can only choose between starting from the beginning of all topics or only 
processing newly arriving records.

To start processing at any give point in the past (e.g. only processing data of 
the last month) the application has to be started (so the consumer group 
exists), stopped, the offsets reset and then restarted.

It would be helpful if this could be passed in with the help of some kind of 
"offset reset strategy" which could be provided by the user.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-07 Thread Rajini Sivaram
Hi Ron/Harsha/Satish,

Thanks for reviewing the KIP!

We should perhaps have a wider discussion outside this KIP for refactoring
clients so that others who are not following this KIP also notice the
discussion. Satish, would you like to start a discussion thread on dev?

Regards,

Rajini


On Wed, Aug 7, 2019 at 6:21 PM Satish Duggana 
wrote:

> I felt the same need when we want to add a pluggable API for core
> server functionality. This does not need to be part of this KIP, it
> can be a separate KIP. I can contribute those refactoring changes if
> others are OK with that.
>
> It is better to have a structure like below.
>
> kafka-common:
> common classes which can be used in any of the other modules in Kafka
> like client, Kafka-server-common and server etc.
>
> kafka-client-common:
> common classes which can be used in the client module. This can be
> part of client module itself.
>
> kafka-server-common:
> classes required only for kafka-server.
>
> Thanks.
> Satish.
>
> On Wed, Aug 7, 2019 at 9:28 PM Harsha Chintalapani 
> wrote:
> >
> > Thanks for the KIP Rajini.
> > Quick thought, it would be good to have a common module outside of
> clients
> > that only applies to server side interfaces & changes. It looks like we
> are
> > increasingly in favor of using Java interface for pluggable modules  on
> the
> > broker side.
> >
> > Thanks,
> > Harsha
> >
> >
> > On Tue, Aug 06, 2019 at 2:31 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I have created a KIP to replace the Scala Authorizer API with a new
> Java
> > > API:
> > >
> > > -
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > KIP-504+-+Add+new+Java+Authorizer+Interface
> > >
> > > This is replacement for KIP-50 which was accepted but never merged.
> Apart
> > > from moving to a Java API consistent with other pluggable interfaces
> in the
> > > broker, KIP-504 also attempts to address known limitations in the
> > > authorizer. If you have come across other limitations that you would
> like
> > > to see addressed in the new API, please raise these on the discussion
> > > thread so that we can consider those too. All suggestions and feedback
> are
> > > welcome.
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
>


Re: [DISCUSS] KIP-491: Preferred Leader Deprioritized List (Temporary Blacklist)

2019-08-07 Thread George Li
 Hi Colin,

Thanks for your feedbacks.  Comments below:
> Even if you have a way of blacklisting an entire broker all at once, you 
>still would need to run a leader election > for each partition where you want 
>to move the leader off of the blacklisted broker.  So the operation is still 
>O(N) in > that sense-- you have to do something per partition.

For a failed broker and swapped with an empty broker, when it comes up, it will 
not have any leadership, and we would like it to remain not having leaderships 
for a couple of hours or days. So there is no preferred leader election needed 
which incurs O(N) operation in this case.  Putting the preferred leader 
blacklist would safe guard this broker serving traffic during that time. 
otherwise, if another broker fails(if this broker is the 1st, 2nd in the 
assignment), or someone runs preferred leader election, this new "empty" broker 
can still get leaderships. 

Also running reassignment to change the ordering of preferred leader would not 
actually switch the leader automatically.  e.g.  (1,2,3) => (2,3,1). unless 
preferred leader election is run to switch current leader from 1 to 2.  So the 
operation is at least 2 x O(N).  and then after the broker is back to normal, 
another 2 x O(N) to rollback. 


> In general, reassignment will get a lot easier and quicker once KIP-455 is 
> implemented.  > Reassignments that just change the order of preferred 
> replicas for a specific partition should complete pretty much instantly.
>> I think it's simpler and easier just to have one source of truth for what 
>> the preferred replica is for a partition, rather than two.  So for> me, the 
>> fact that the replica assignment ordering isn't changed is actually a big 
>> disadvantage of this KIP.  If you are a new user (or just>  an existing user 
>> that didn't read all of the documentation) and you just look at the replica 
>> assignment, you might be confused by why> a particular broker wasn't getting 
>> any leaderships, even  though it appeared like it should.  More mechanisms 
>> mean more complexity> for users and developers most of the time.


I would like stress the point that running reassignment to change the ordering 
of the replica (putting a broker to the end of partition assignment) is 
unnecessary, because after some time the broker is caught up, it can start 
serving traffic and then need to run reassignments again to "rollback" to 
previous states. As I mentioned in KIP-491, this is just tedious work. 

I agree this might introduce some complexities for users/developers. But if 
this feature is good, and well documented, it is good for the kafka 
product/community.  Just like KIP-460 enabling unclean leader election to 
override TopicLevel/Broker Level config of `unclean.leader.election.enable`

> I agree that it would be nice if we could treat some brokers differently for 
> the purposes of placing replicas, selecting leaders, etc. > Right now, we 
> don't have any way of implementing that without forking the broker.  I would 
> support a new PlacementPolicy class that> would close this gap.  But I don't 
> think this KIP is flexible enough to fill this role.  For example, it can't 
> prevent users from creating> new single-replica topics that get put on the 
> "bad" replica.  Perhaps we should reopen the discussion> about 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-201%3A+Rationalising+Policy+interfaces

Creating topic with single-replica is beyond what KIP-491 is trying to achieve. 
 The user needs to take responsibility of doing that. I do see some Samza 
clients notoriously creating single-replica topics and that got flagged by 
alerts, because a single broker down/maintenance will cause offline partitions. 
For KIP-491 preferred leader "blacklist",  the single-replica will still serve 
as leaders, because there is no other alternative replica to be chosen as 
leader. 

Even with a new PlacementPolicy for topic creation/partition expansion, it 
still needs the blacklist info (e.g. a zk path node, or broker level/topic 
level config) to "blacklist" the broker to be preferred leader? Would it be the 
same as KIP-491 is introducing? 


Thanks,
George

On Wednesday, August 7, 2019, 11:01:51 AM PDT, Colin McCabe 
 wrote:  
 
 On Fri, Aug 2, 2019, at 20:02, George Li wrote:
>  Hi Colin,
> Thanks for looking into this KIP.  Sorry for the late response. been busy. 
> 
> If a cluster has MAMY topic partitions, moving this "blacklist" broker 
> to the end of replica list is still a rather "big" operation, involving 
> submitting reassignments.  The KIP-491 way of blacklist is much 
> simpler/easier and can undo easily without changing the replica 
> assignment ordering. 

Hi George,

Even if you have a way of blacklisting an entire broker all at once, you still 
would need to run a leader election for each partition where you want to move 
the leader off of the blacklisted broker.  So the operation is still O(N) in 
that sense-- you 

Build failed in Jenkins: kafka-trunk-jdk8 #3835

2019-08-07 Thread Apache Jenkins Server
See 


Changes:

[manikumar] KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken

--
[...truncated 2.60 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-07 Thread John Roesler
Hey Guozhang,

Thanks for the review!

1. Yes, even with `num.standby.replicas := 0`, we will still temporarily
allocate standby tasks to accomplish a no-downtime task migration.
Although, I'd argue that this doesn't really violate the config, as the
task isn't a true hot standby. As soon as it catches up, we'll rebalance
again, that task will become active, and the original instance that hosted
the active task will no longer have the task assigned at all. Once the
stateDirCleaner kicks in, we'll free the disk space from it, and return to
the steady-state of having just one copy of the task in the cluster.

We can of course do without this, but I feel the current proposal is
operationally preferable, since it doesn't make configuring hot-standbys a
pre-requisite for fast rebalances.

2. Yes, I think your interpretation is what we intended. The default
balance_factor would be 1, as it is implicitly today. What this does is
allows operators to trade off less balanced assignments against fewer
rebalances. If you have lots of space capacity in your instances, this may
be a perfectly fine tradeoff, and you may prefer for Streams not to bother
streaming GBs of data from the broker in pursuit of perfect balance. Not
married to this configuration, though. It was inspired by the related work
research we did.

3. I'll take a look

3a. I think this is a good idea. I'd classify it as a type of grey failure
detection. It may make more sense to tackle grey failures as part of the
heartbeat protocol (as I POCed here:
https://github.com/apache/kafka/pull/7096/files). WDYT?

4. Good catch! I didn't think about that before. Looking at it now, though,
I wonder if we're actually protected already. The stateDirCleaner thread
only executes if the instance is in RUNNING state, and KIP-441 proposes to
use "probing rebalances" to report task lag. Hence, during the window
between when the instance reports a lag and the assignor makes a decision
about it, the instance should remain in REBALANCING state, right? If so,
then this should prevent the race condition. If not, then we do indeed need
to do something about it.

5. Good idea. I think that today, you can only see the consumer lag, which
is a poor substitute. I'll add some metrics to the proposal.

Thanks again for the comments!
-John

On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang  wrote:

> Hello Sophie,
>
> Thanks for the proposed KIP. I left some comments on the wiki itself, and I
> think I'm still not very clear on a couple or those:
>
> 1. With this proposal, does that mean with num.standby.replicas == 0, we
> may sometimes still have some standby tasks which may violate the config?
>
> 2. I think I understand the rationale to consider lags that is below the
> specified threshold to be equal, rather than still considering 5000 is
> better than 5001 -- we do not want to "over-optimize" and potentially falls
> into endless rebalances back and forth.
>
> But I'm not clear about the rationale of the second parameter of
> constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> balance_factor):
>
> Does that mean, e.g. with balance_factor of 3, we'd consider two
> assignments one resulting balance_factor 0 and one resulting balance_factor
> 3 to be equally optimized assignment and therefore may "stop early"? This
> was not very convincing to me :P
>
> 3. There are a couple of minor comments about the algorithm itself, left on
> the wiki page since it needs to refer to the exact line and better
> displayed there.
>
> 3.a Another wild thought about the threshold itself: today the assignment
> itself is memoryless, so we would not know if the reported `TaskLag` itself
> is increasing or decreasing even if the current value is under the
> threshold. I wonder if it worthy to make it a bit more complicated to track
> task lag trend at the assignor? Practically it may not be very uncommon
> that stand-by tasks are not keeping up due to the fact that other active
> tasks hosted on the same thread is starving the standby tasks.
>
> 4. There's a potential race condition risk when reporting `TaskLags` in the
> subscription: right after reporting it to the leader, the cleanup thread
> kicks in and deletes the state directory. If the task was assigned to the
> host it would cause it to restore from beginning and effectively make the
> seemingly optimized assignment very sub-optimal.
>
> To be on the safer side we should consider either prune out those tasks
> that are "close to be cleaned up" in the subscription, or we should delay
> the cleanup right after we've included them in the subscription in case
> they are been selected as assigned tasks by the assignor.
>
> 5. This is a meta comment: I think it would be helpful to add some user
> visibility on the standby tasks lagging as well, via metrics for example.
> Today it is hard for us to observe how far are our current standby tasks
> compared to the active tasks and whether that lag is being increasing or
> decreasing. As a 

[VOTE] KIP-497: Add inter-broker API to alter ISR

2019-08-07 Thread Jason Gustafson
Hi All,

I'd like to start a vote on KIP-497:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR.
+1
from me.

-Jason


[VOTE] KIP-496: Administrative API to delete consumer offsets

2019-08-07 Thread Jason Gustafson
Hi All,

I'd like to start a vote on KIP-496:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets.
+1
from me of course.

-Jason


Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Justine Olshan
Hi Harsha,

I think my message may have gotten lost in all the others.

Two of the goals of this KIP are to 1) allow auto-creation on specific
clients when the broker default is false and 2) eventually replace the
broker config.

In order to accomplish these two goals, we need the producer to be able to
create topics despite the broker config. (How can we replace a function
when we rely on it?)
I think at this point we have a fundamental disagreement in what we should
allow the producer to do.
In my previous message I mentioned a config that would allow for the broker
to prevent producer auto-creation. (It would be disabled by default.) It
would fix your issue for now, but could lead to more complications later.

Thank you,
Justine


On Wed, Aug 7, 2019 at 10:56 AM Harsha Chintalapani  wrote:

> On Wed, Aug 07, 2019 at 9:50 AM, Colin McCabe  wrote:
>
> > On Wed, Aug 7, 2019, at 09:24, Harsha Ch wrote:
> >
> > On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org >
> > wrote:
> >
> > On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> >
> > Hi Colin,
> > "Hmm... I'm not sure I follow. Users don't have to build their own
> > tooling, right? They can use any of the shell scripts that we've shipped
> in
> > the last few releases. For example, if any of your users run it, this
> shell
> > script will delete all of the topics from your non-security-enabled
> > cluster:
> >
> > ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > --bootstrap-server localhost:9092 --list 2>/dev/null
> > | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > --bootstrap-server localhost:9092 --delete
> > --topic
> >
> > They will need to fill in the correct bootstrap servers list, of course,
> > not localhost. This deletion script will work on some pretty old brokers,
> > even back to the 0.10 releases. It seems a little odd to trust your users
> > with this power, but not trust them to avoid changing a particular
> > configuration key."
> >
> > The above will blocked by the server if we set delete.topic.enable to
> > false and thats exactly what I am asking for.
> >
> > Hi Harsha,
> >
> > I was wondering if someone was going to bring up that configuration :)
> >
> > it's an interesting complication, but globally disabling topic deletion
> is
> > not very practical for most use-cases.
> >
> > In any case, there are plenty of other bad things that users with full
> > permissions can do that aren't blocked by any server configuration. For
> > example, they can delete every record in every topic. I can write a
> script
> > for that too, and there's no server configuration you can set to disable
> > it. Or I could simply create hundreds of thousands of topics, until
> cluster
> > performance becomes unacceptable (this will be even more of a problem if
> > someone configured delete.topic.enable as false). Or publish bad data to
> > every topic, etc. etc.
> >
> > The point I'm trying to make here is that you can't rely on these kind of
> > server-side configurations for security. At most, they're a way to set up
> > certain very simple policies. But the policies are so simple that they're
> > hardly ever useful any more.
> >
> > For example, if the problem you want to solve is that you want a user to
> > only be able to create 50 topics and not delete anyone else's topics, you
> > can solve that with a CreateTopicsPolicy that limits the number of
> topics,
> > and some ACLs. There's no combination of auto.create.topics.enable and
> > delete.topic.enable that will help here.
> >
> > Hi Colin,
> >
> > Well you gave the example that a user can delete the topics
> > just by running that script  :).
> >
> > I understand there are open APIs in Kafka and can lead to rogue clients
> > taking advantage of it without proper security in place.
> >
> > What I am asking so far in this thread is , this KIP is changing the
> > producer behavior and its not backward compatible.
> >
> > The change is backwards compatible. The default will still be server-side
> > topic auto-creation, just like now.
> >
> You will have to specifically change the producer config to get the new
> > behavior.
> >
>
>
> I disagree.  Today server can turn off the topic creation  neither producer
> or consumer can create a topic. With this KIP , producer can create a topic
> by turning on client side config when server side config is turned off.
>
>
> We can still achieve
> > the main goal of the KIP which is to change MetadataRequest  creating
> > topics and send a CreateTopicRequest from Producer and also keep the
> server
> > side config to have precedence.  This KIP originally written to have
> server
> > side preference and there is not much explanation why it changed to have
> > Producer side preference.
> >
> > Arguing that AdminClient can do that and so we are going to make Producer
> > do the same doesn't make sense.
> >
> > "The downside is that if we wanted to check a server side configuration
> > before sending the 

Re: [VOTE] KIP-396: Add Commit/List Offsets Operations to AdminClient

2019-08-07 Thread Jason Gustafson
Thanks Mickael, +1 from me.

Just a couple more comments:

> Yes mapping it to a friendlier error would be nice but I'm not sure
if we can do that because UNKNOWN_MEMBER_ID is also returned if the
group is dead.

I think the logic was chanced recently to return
`COORDINATOR_NOT_AVAILABLE` if the group is Dead, so I don't think this is
a problem, at least not for new versions. One thing to keep in mind is that
the Dead state is just a transient state while we're in the process of
unloading the group. It should be rare that we hit it. For old brokers
which still return UNKNOWN_MEMBER_ID in this state, I don't think it would
be so bad to return a NotEmptyGroup error. Basically we have to throw
something in this case, so we may as well throw something which indicates
the most likely problem.

4. I assume the default isolation level for listOffsets would be
read_uncommitted?

5. I don't feel too strongly about it, but other admin APIs have tended to
use "alter" for updates (e.g. `alterConfigs` and `alterReplicaLogDirs`).
You might consider `alterConsumerGroupOffsets` over
`resetConsumerGroupOffsets`.

-Jason

On Wed, Aug 7, 2019 at 10:15 AM Mickael Maison 
wrote:

> Hi Jason,
>
> Thanks for the feedback
>
> 1. Yes listOffsets() should be able to retrieve earliest, latest and
> by-timestamp offsets.
> I agree, it's better to avoid exposing magic values. I've updated the
> KIP to use an OffsetSpec object as suggested.
>
> 2. Yes let's expose the leader epoch in ListOffsetsResultInfo.
>
> 3. Yes mapping it to a friendlier error would be nice but I'm not sure
> if we can do that because UNKNOWN_MEMBER_ID is also returned if the
> group is dead.
>
> On Tue, Aug 6, 2019 at 6:38 PM Jason Gustafson  wrote:
> >
> > Thanks for the KIP. This makes sense to me. Just a couple small comments:
> >
> > 1. Can the listOffsets API be used to get the start and end offsets? In
> the
> > consumer, we use separate APIs for this: `beginningOffsets` and
> > `endOffsets` to avoid the need for sentinels. An alternative would be to
> > introduce an `OffsetSpec` (or maybe `OffsetQuery`) object to customize
> the
> > query. For example:
> >
> > public ListOffsetsResult listOffsets(Map
> > partitionOffsetSpecs)
> >
> > The benefit is that we can avoid sentinel values and we have an extension
> > point for additional query options in the future. What do you think?
> >
> > 2. The ListOffset response includes the leader epoch corresponding to the
> > offset that was found. This is useful for finer-grained reasoning about
> the
> > log. We expose this in the consumer in the OffsetAndTimestamp object
> which
> > is returned from `offsetsForTimes`. Does it make sense to add this to
> > `ListOffsetsResultInfo` as well?
> >
> > 3. If the group is still active, the call to reset offsets will fail.
> > Currently this would result in an UNKNOWN_MEMBER_ID error. I think it
> would
> > make sense to map this exception to a friendlier error before raising to
> > the user. For example, `NonEmptyGroupException` or something like that.
> >
> > -Jason
> >
> >
> >
> >
> >
> > On Tue, Aug 6, 2019 at 9:33 AM Mickael Maison 
> > wrote:
> >
> > > Hi Colin,
> > >
> > > Thank you for taking a look!
> > > I agree, being able to set consumer group offsets via the AdminClient
> > > would be really useful, hence I created this KIP.
> > >
> > > With the total absence of binding votes, I guessed I needed to make
> > > some changes. Do you mean you preferred the previous naming
> > > (commitConsumerGroupOffsets) over "resetConsumerGroupOffsets"?
> > >
> > > Thanks
> > >
> > > On Mon, Aug 5, 2019 at 8:26 PM Colin McCabe 
> wrote:
> > > >
> > > > I think it would be useful to have this in AdminClient.  Especially
> if
> > > we implement KIP-496: Administrative API to delete consumer offsets.
> It
> > > would be odd to have a way to delete consumer offsets in AdminClient,
> but
> > > not to create them.  What do you think?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Sun, Aug 4, 2019, at 09:27, Mickael Maison wrote:
> > > > > Hi,
> > > > >
> > > > > In an attempt to unblock this KIP, I've made some adjustments:
> > > > > I've renamed the commitConsumerGroupOffsets() methods to
> > > > > resetConsumerGroupOffsets() to reduce confusion. That should better
> > > > > highlight the differences with the regular commit() operation from
> the
> > > > > Consumer API. I've also added some details to the motivation
> section.
> > > > >
> > > > > So we have +5 non binding votes and 0 binding votes
> > > > >
> > > > > On Mon, Mar 25, 2019 at 1:10 PM Mickael Maison <
> > > mickael.mai...@gmail.com> wrote:
> > > > > >
> > > > > > Bumping this thread once again
> > > > > >
> > > > > > Ismael, have I answered your questions?
> > > > > > While this has received a few non-binding +1s, no committers have
> > > > > > voted yet. If you have concerns or questions, please let me know.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Mon, Feb 11, 2019 at 11:51 AM Mickael 

Re: [DISCUSS] KIP-491: Preferred Leader Deprioritized List (Temporary Blacklist)

2019-08-07 Thread Colin McCabe
On Fri, Aug 2, 2019, at 20:02, George Li wrote:
>  Hi Colin,
> Thanks for looking into this KIP.  Sorry for the late response. been busy. 
> 
> If a cluster has MAMY topic partitions, moving this "blacklist" broker 
> to the end of replica list is still a rather "big" operation, involving 
> submitting reassignments.  The KIP-491 way of blacklist is much 
> simpler/easier and can undo easily without changing the replica 
> assignment ordering. 

Hi George,

Even if you have a way of blacklisting an entire broker all at once, you still 
would need to run a leader election for each partition where you want to move 
the leader off of the blacklisted broker.  So the operation is still O(N) in 
that sense-- you have to do something per partition.

In general, reassignment will get a lot easier and quicker once KIP-455 is 
implemented.  Reassignments that just change the order of preferred replicas 
for a specific partition should complete pretty much instantly.

I think it's simpler and easier just to have one source of truth for what the 
preferred replica is for a partition, rather than two.  So for me, the fact 
that the replica assignment ordering isn't changed is actually a big 
disadvantage of this KIP.  If you are a new user (or just an existing user that 
didn't read all of the documentation) and you just look at the replica 
assignment, you might be confused by why a particular broker wasn't getting any 
leaderships, even  though it appeared like it should.  More mechanisms mean 
more complexity for users and developers most of the time.

> Major use case for me, a failed broker got swapped with new hardware, 
> and starts up as empty (with latest offset of all partitions), the SLA 
> of retention is 1 day, so before this broker is up to be in-sync for 1 
> day, we would like to blacklist this broker from serving traffic. after 
> 1 day, the blacklist is removed and run preferred leader election.  
> This way, no need to run reassignments before/after.  This is the 
> "temporary" use-case.

What if we just add an option to the reassignment tool to generate a plan to 
move all the leaders off of a specific broker?  The tool could also run a 
leader election as well.  That would be a simple way of doing this without 
adding new mechanisms or broker-side configurations, etc.

> 
> There are use-cases that this Preferred Leader "blacklist" can be 
> somewhat permanent, as I explained in the AWS data center instances Vs. 
> on-premises data center bare metal machines (heterogenous hardware), 
> that the AWS broker_ids will be blacklisted.  So new topics created,  
> or existing topic expansion would not make them serve traffic even they 
> could be the preferred leader. 

I agree that it would be nice if we could treat some brokers differently for 
the purposes of placing replicas, selecting leaders, etc.  Right now, we don't 
have any way of implementing that without forking the broker.  I would support 
a new PlacementPolicy class that would close this gap.  But I don't think this 
KIP is flexible enough to fill this role.  For example, it can't prevent users 
from creating new single-replica topics that get put on the "bad" replica.  
Perhaps we should reopen the discussion about 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-201%3A+Rationalising+Policy+interfaces

regards,
Colin

> 
> Please let me know there are more question. 
> 
> 
> Thanks,
> George
> 
> On Thursday, July 25, 2019, 08:38:28 AM PDT, Colin McCabe 
>  wrote:  
>  
>  We still want to give the "blacklisted" broker the leadership if 
> nobody else is available.  Therefore, isn't putting a broker on the 
> blacklist pretty much the same as moving it to the last entry in the 
> replicas list and then triggering a preferred leader election?
> 
> If we want this to be undone after a certain amount of time, or under 
> certain conditions, that seems like something that would be more 
> effectively done by an external system, rather than putting all these 
> policies into Kafka.
> 
> best,
> Colin
> 
> 
> On Fri, Jul 19, 2019, at 18:23, George Li wrote:
> >  Hi Satish,
> > Thanks for the reviews and feedbacks.
> > 
> > > > The following is the requirements this KIP is trying to accomplish:
> > > This can be moved to the"Proposed changes" section.
> > 
> > Updated the KIP-491. 
> > 
> > > >>The logic to determine the priority/order of which broker should be
> > > preferred leader should be modified.  The broker in the preferred leader
> > > blacklist should be moved to the end (lowest priority) when
> > > determining leadership.
> > >
> > > I believe there is no change required in the ordering of the preferred
> > > replica list. Brokers in the preferred leader blacklist are skipped
> > > until other brokers int he list are unavailable.
> > 
> > Yes. partition assignment remained the same, replica & ordering. The 
> > blacklist logic can be optimized during implementation. 
> > 
> > > >>The blacklist can be at the broker level. 

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Harsha Chintalapani
On Wed, Aug 07, 2019 at 9:50 AM, Colin McCabe  wrote:

> On Wed, Aug 7, 2019, at 09:24, Harsha Ch wrote:
>
> On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org >
> wrote:
>
> On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
>
> Hi Colin,
> "Hmm... I'm not sure I follow. Users don't have to build their own
> tooling, right? They can use any of the shell scripts that we've shipped in
> the last few releases. For example, if any of your users run it, this shell
> script will delete all of the topics from your non-security-enabled
> cluster:
>
> ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> --bootstrap-server localhost:9092 --list 2>/dev/null
> | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> --bootstrap-server localhost:9092 --delete
> --topic
>
> They will need to fill in the correct bootstrap servers list, of course,
> not localhost. This deletion script will work on some pretty old brokers,
> even back to the 0.10 releases. It seems a little odd to trust your users
> with this power, but not trust them to avoid changing a particular
> configuration key."
>
> The above will blocked by the server if we set delete.topic.enable to
> false and thats exactly what I am asking for.
>
> Hi Harsha,
>
> I was wondering if someone was going to bring up that configuration :)
>
> it's an interesting complication, but globally disabling topic deletion is
> not very practical for most use-cases.
>
> In any case, there are plenty of other bad things that users with full
> permissions can do that aren't blocked by any server configuration. For
> example, they can delete every record in every topic. I can write a script
> for that too, and there's no server configuration you can set to disable
> it. Or I could simply create hundreds of thousands of topics, until cluster
> performance becomes unacceptable (this will be even more of a problem if
> someone configured delete.topic.enable as false). Or publish bad data to
> every topic, etc. etc.
>
> The point I'm trying to make here is that you can't rely on these kind of
> server-side configurations for security. At most, they're a way to set up
> certain very simple policies. But the policies are so simple that they're
> hardly ever useful any more.
>
> For example, if the problem you want to solve is that you want a user to
> only be able to create 50 topics and not delete anyone else's topics, you
> can solve that with a CreateTopicsPolicy that limits the number of topics,
> and some ACLs. There's no combination of auto.create.topics.enable and
> delete.topic.enable that will help here.
>
> Hi Colin,
>
> Well you gave the example that a user can delete the topics
> just by running that script  :).
>
> I understand there are open APIs in Kafka and can lead to rogue clients
> taking advantage of it without proper security in place.
>
> What I am asking so far in this thread is , this KIP is changing the
> producer behavior and its not backward compatible.
>
> The change is backwards compatible. The default will still be server-side
> topic auto-creation, just like now.
>
You will have to specifically change the producer config to get the new
> behavior.
>


I disagree.  Today server can turn off the topic creation  neither producer
or consumer can create a topic. With this KIP , producer can create a topic
by turning on client side config when server side config is turned off.


We can still achieve
> the main goal of the KIP which is to change MetadataRequest  creating
> topics and send a CreateTopicRequest from Producer and also keep the server
> side config to have precedence.  This KIP originally written to have server
> side preference and there is not much explanation why it changed to have
> Producer side preference.
>
> Arguing that AdminClient can do that and so we are going to make Producer
> do the same doesn't make sense.
>
> "The downside is that if we wanted to check a server side configuration
> before sending the create topics request, the code would be more complex.
> The behavior would also not be consistent with how topic auto-creation is
> handled in Kafka Streams."
>
> I am not sure why you need to check server side configuration before
> sending create topics request. A user enables producer side config to
> create topics.
> Producer sends a request to the broker and if the broker has
> auto.topic.create.enable to true (default) it will allow creation of
> topics. If it set to false it returns error back to the client.
>
> auto.topic.create.enable has never affected CreateTopicsRequest. If you
> submit a CreateTopicsRequest and you are authorized, a topic will be
> created, regardless of what the value of auto.topic.create.enable is. This
> behavior goes back a long way, to about 2016 (Kafka 0.10.1, I think?)
>
> I don't see how this behavior will be different in Kafka streams. By
> default server allows the topic creation and with this KIP, It will only
> allow creation of topic when both 

[jira] [Created] (KAFKA-8765) Remove "unstable" annotations

2019-08-07 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8765:
--

 Summary: Remove "unstable" annotations
 Key: KAFKA-8765
 URL: https://issues.apache.org/jira/browse/KAFKA-8765
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 2.4.0


Kafka Streams is still annotating some public API as unstable, via 
`@InterfaceStability.Evolving`.

We should remove all those annotations, as all APIs are considered stable for 
some time already and we maintain backward compatibility already.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum

2019-08-07 Thread Mickael Maison
Thank Colin for kickstarting this initiative.

Just one question.
- A nice feature of Zookeeper is the ability to use chroots and have
several Kafka clusters use the same Zookeeper ensemble. Is this
something we should keep?

Thanks

On Mon, Aug 5, 2019 at 7:44 PM Colin McCabe  wrote:
>
> On Mon, Aug 5, 2019, at 10:02, Tom Bentley wrote:
> > Hi Colin,
> >
> > Thanks for the KIP.
> >
> > Currently ZooKeeper provides a convenient notification mechanism for
> > knowing that broker and topic configuration has changed. While KIP-500 does
> > suggest that incremental metadata update is expected to come to clients
> > eventually, that would seem to imply that for some number of releases there
> > would be no equivalent mechanism for knowing about config changes. Is there
> > any thinking at this point about how a similar notification might be
> > provided in the future?
>
> We could eventually have some inotify-like mechanism where clients could 
> register interest in various types of events and got notified when they 
> happened.  Reading the metadata log is conceptually simple.  The main 
> complexity would be in setting up an API that made sense and that didn't 
> unduly constrain future implementations.  We'd have to think carefully about 
> what the real use-cases for this were, though.
>
> best,
> Colin
>
> >
> > Thanks,
> >
> > Tom
> >
> > On Mon, Aug 5, 2019 at 3:49 PM Viktor Somogyi-Vass 
> > wrote:
> >
> > > Hey Colin,
> > >
> > > I think this is a long-awaited KIP, thanks for driving it. I'm excited to
> > > see this in Kafka once. I collected my questions (and I accept the "TBD"
> > > answer as they might be a bit deep for this high level :) ).
> > > 1.) Are there any specific reasons for the Controller just periodically
> > > persisting its state on disk periodically instead of asynchronously with
> > > every update? Wouldn't less frequent saves increase the chance for missing
> > > a state change if the controller crashes between two saves?
> > > 2.) Why can't we allow brokers to fetch metadata from the follower
> > > controllers? I assume that followers would have up-to-date information
> > > therefore brokers could fetch from there in theory.
> > >
> > > Thanks,
> > > Viktor
> > >
> > > On Sun, Aug 4, 2019 at 6:58 AM Boyang Chen 
> > > wrote:
> > >
> > > > Thanks for explaining Ismael! Breaking down into follow-up KIPs sounds
> > > like
> > > > a good idea.
> > > >
> > > > On Sat, Aug 3, 2019 at 10:14 AM Ismael Juma  wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > Yes, there will be several KIPs that will discuss the items you
> > > describe
> > > > in
> > > > > detail. Colin, it may be helpful to make this clear in the KIP 500
> > > > > description.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Sat, Aug 3, 2019 at 9:32 AM Boyang Chen  > > >
> > > > > wrote:
> > > > >
> > > > > > Thanks Colin for initiating this important effort!
> > > > > >
> > > > > > One question I have is whether we have a session discussing the
> > > > > controller
> > > > > > failover in the new architecture? I know we are using Raft protocol
> > > to
> > > > > > failover, yet it's still valuable to discuss the steps new cluster 
> > > > > > is
> > > > > going
> > > > > > to take to reach the stable stage again, so that we could easily
> > > > measure
> > > > > > the availability of the metadata servers.
> > > > > >
> > > > > > Another suggestion I have is to write a step-by-step design doc like
> > > > what
> > > > > > we did in KIP-98
> > > > > > <
> > > > > >
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > >,
> > > > > > including the new request protocols and how they are interacting in
> > > the
> > > > > new
> > > > > > cluster. For a complicated change like this, an implementation 
> > > > > > design
> > > > doc
> > > > > > help a lot in the review process, otherwise most discussions we have
> > > > will
> > > > > > focus on high level and lose important details as we discover them 
> > > > > > in
> > > > the
> > > > > > post-agreement phase.
> > > > > >
> > > > > > Boyang
> > > > > >
> > > > > > On Fri, Aug 2, 2019 at 5:17 PM Colin McCabe 
> > > > wrote:
> > > > > >
> > > > > > > On Fri, Aug 2, 2019, at 16:33, Jose Armando Garcia Sancio wrote:
> > > > > > > > Thanks Colin for the detail KIP. I have a few comments and
> > > > questions.
> > > > > > > >
> > > > > > > > In the KIP's Motivation and Overview you mentioned the
> > > LeaderAndIsr
> > > > > and
> > > > > > > > UpdateMetadata RPC. For example, "updates which the controller
> > > > > pushes,
> > > > > > > such
> > > > > > > > as LeaderAndIsr and UpdateMetadata messages". Is your thinking
> > > that
> > > > > we
> > > > > > > will
> > > > > > > > use MetadataFetch as a replacement to just UpdateMetadata only
> > > and
> > > > > add
> > > > > > > > topic configuration in this state?
> > > > > > > >
> > > > > > >
> > > > > > > Hi Jose,
> > > > > > >
> > 

Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-07 Thread Satish Duggana
I felt the same need when we want to add a pluggable API for core
server functionality. This does not need to be part of this KIP, it
can be a separate KIP. I can contribute those refactoring changes if
others are OK with that.

It is better to have a structure like below.

kafka-common:
common classes which can be used in any of the other modules in Kafka
like client, Kafka-server-common and server etc.

kafka-client-common:
common classes which can be used in the client module. This can be
part of client module itself.

kafka-server-common:
classes required only for kafka-server.

Thanks.
Satish.

On Wed, Aug 7, 2019 at 9:28 PM Harsha Chintalapani  wrote:
>
> Thanks for the KIP Rajini.
> Quick thought, it would be good to have a common module outside of clients
> that only applies to server side interfaces & changes. It looks like we are
> increasingly in favor of using Java interface for pluggable modules  on the
> broker side.
>
> Thanks,
> Harsha
>
>
> On Tue, Aug 06, 2019 at 2:31 PM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have created a KIP to replace the Scala Authorizer API with a new Java
> > API:
> >
> > -
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > KIP-504+-+Add+new+Java+Authorizer+Interface
> >
> > This is replacement for KIP-50 which was accepted but never merged. Apart
> > from moving to a Java API consistent with other pluggable interfaces in the
> > broker, KIP-504 also attempts to address known limitations in the
> > authorizer. If you have come across other limitations that you would like
> > to see addressed in the new API, please raise these on the discussion
> > thread so that we can consider those too. All suggestions and feedback are
> > welcome.
> >
> > Thank you,
> >
> > Rajini
> >


Re: [VOTE] KIP-396: Add Commit/List Offsets Operations to AdminClient

2019-08-07 Thread Mickael Maison
Hi Jason,

Thanks for the feedback

1. Yes listOffsets() should be able to retrieve earliest, latest and
by-timestamp offsets.
I agree, it's better to avoid exposing magic values. I've updated the
KIP to use an OffsetSpec object as suggested.

2. Yes let's expose the leader epoch in ListOffsetsResultInfo.

3. Yes mapping it to a friendlier error would be nice but I'm not sure
if we can do that because UNKNOWN_MEMBER_ID is also returned if the
group is dead.

On Tue, Aug 6, 2019 at 6:38 PM Jason Gustafson  wrote:
>
> Thanks for the KIP. This makes sense to me. Just a couple small comments:
>
> 1. Can the listOffsets API be used to get the start and end offsets? In the
> consumer, we use separate APIs for this: `beginningOffsets` and
> `endOffsets` to avoid the need for sentinels. An alternative would be to
> introduce an `OffsetSpec` (or maybe `OffsetQuery`) object to customize the
> query. For example:
>
> public ListOffsetsResult listOffsets(Map
> partitionOffsetSpecs)
>
> The benefit is that we can avoid sentinel values and we have an extension
> point for additional query options in the future. What do you think?
>
> 2. The ListOffset response includes the leader epoch corresponding to the
> offset that was found. This is useful for finer-grained reasoning about the
> log. We expose this in the consumer in the OffsetAndTimestamp object which
> is returned from `offsetsForTimes`. Does it make sense to add this to
> `ListOffsetsResultInfo` as well?
>
> 3. If the group is still active, the call to reset offsets will fail.
> Currently this would result in an UNKNOWN_MEMBER_ID error. I think it would
> make sense to map this exception to a friendlier error before raising to
> the user. For example, `NonEmptyGroupException` or something like that.
>
> -Jason
>
>
>
>
>
> On Tue, Aug 6, 2019 at 9:33 AM Mickael Maison 
> wrote:
>
> > Hi Colin,
> >
> > Thank you for taking a look!
> > I agree, being able to set consumer group offsets via the AdminClient
> > would be really useful, hence I created this KIP.
> >
> > With the total absence of binding votes, I guessed I needed to make
> > some changes. Do you mean you preferred the previous naming
> > (commitConsumerGroupOffsets) over "resetConsumerGroupOffsets"?
> >
> > Thanks
> >
> > On Mon, Aug 5, 2019 at 8:26 PM Colin McCabe  wrote:
> > >
> > > I think it would be useful to have this in AdminClient.  Especially if
> > we implement KIP-496: Administrative API to delete consumer offsets.  It
> > would be odd to have a way to delete consumer offsets in AdminClient, but
> > not to create them.  What do you think?
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Sun, Aug 4, 2019, at 09:27, Mickael Maison wrote:
> > > > Hi,
> > > >
> > > > In an attempt to unblock this KIP, I've made some adjustments:
> > > > I've renamed the commitConsumerGroupOffsets() methods to
> > > > resetConsumerGroupOffsets() to reduce confusion. That should better
> > > > highlight the differences with the regular commit() operation from the
> > > > Consumer API. I've also added some details to the motivation section.
> > > >
> > > > So we have +5 non binding votes and 0 binding votes
> > > >
> > > > On Mon, Mar 25, 2019 at 1:10 PM Mickael Maison <
> > mickael.mai...@gmail.com> wrote:
> > > > >
> > > > > Bumping this thread once again
> > > > >
> > > > > Ismael, have I answered your questions?
> > > > > While this has received a few non-binding +1s, no committers have
> > > > > voted yet. If you have concerns or questions, please let me know.
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Mon, Feb 11, 2019 at 11:51 AM Mickael Maison
> > > > >  wrote:
> > > > > >
> > > > > > Bumping this thread as it's been a couple of weeks.
> > > > > >
> > > > > > On Tue, Jan 22, 2019 at 2:26 PM Mickael Maison <
> > mickael.mai...@gmail.com> wrote:
> > > > > > >
> > > > > > > Thanks Ismael for the feedback. I think your point has 2 parts:
> > > > > > > - Having the reset functionality in the AdminClient:
> > > > > > > The fact we have a command line tool illustrate that this
> > operation is
> > > > > > > relatively common. I seems valuable to be able to perform this
> > > > > > > operation directly via a proper API in addition of the CLI tool.
> > > > > > >
> > > > > > > - Sending an OffsetCommit directly instead of relying on
> > KafkaConsumer:
> > > > > > > The KafkaConsumer requires a lot of stuff to commit offsets. Its
> > group
> > > > > > > cannot change so you need to start a new Consumer every time,
> > that
> > > > > > > creates new connections and overal sends more requests. Also
> > there are
> > > > > > > already  a bunch of AdminClient APIs that have logic very close
> > to
> > > > > > > what needs to be done to send a commit request, keeping the code
> > small
> > > > > > > and consistent.
> > > > > > >
> > > > > > > I've updated the KIP with these details and moved the 2nd part to
> > > > > > > "Proposed changes" as it's more an implementation detail.
> > > > > > >
> > > > > > > I hope 

[jira] [Resolved] (KAFKA-8716) broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 2.2.1 or 2.3.0

2019-08-07 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang resolved KAFKA-8716.

Resolution: Not A Problem

> broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 
> 2.2.1 or 2.3.0
> 
>
> Key: KAFKA-8716
> URL: https://issues.apache.org/jira/browse/KAFKA-8716
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Yu Yang
>Priority: Critical
>
> We are trying to upgrade kafka binary from 2.1 to 2.2.1 or 2.3.0. For both 
> versions, the broker with updated binary (2.2.1 or 2.3.0) could not get 
> started due to zookeeper session expiration exception.   This error happens 
> repeatedly and the broker could not start because of this. 
> Below is our zk related setting in server.properties:
> {code}
> zookeeper.connection.timeout.ms=6000
> zookeeper.session.timeout.ms=6000
> {code}
> The following is the stack trace, and we are using zookeeper 3.5.3. Instead 
> of waiting for a few seconds, the SESSIONEXPIRED error returned immediately 
> in CheckedEphemeral.create call.  Any insights? 
> [2019-07-25 18:07:35,712] INFO Creating /brokers/ids/80 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> [2019-07-25 18:07:35,724] ERROR Error while creating ephemeral at 
> /brokers/ids/80 with return code: SESSIONEXPIRED 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> [2019-07-25 18:07:35,731] ERROR [KafkaServer id=80] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
> at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1725)
> at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:260)
> at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:75)
> at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-08-07 Thread Almog Gavra
EDIT: everywhere I've been using "HEX" I meant to be using "BASE64". I will
update the KIP to reflect this.

On Wed, Aug 7, 2019 at 9:44 AM Almog Gavra  wrote:

> Thanks for the feedback Arjun! I'm happy changing the default config to
> HEX instead of BINARY, no strong feelings there.
>
> I'll also clarify the example in the KIP to be clearer:
>
> - serialize the decimal field "foo" with value "10.2345" with the HEX
> setting: {"foo": "D3J5"}
> - serialize the decimal field "foo" with value "10.2345" with the NUMERIC
> setting: {"foo": 10.2345}
>
> With regards to the precision issue, that was my original concern as well
> (and why I originally suggested a TEXT format). Many JSON deserializers
> (e.g. Jackson with DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS),
> however, have the ability to deserialize decimals correctly so I will
> configure that as the default for Connect's JsonDeserializer. It's probably
> a good idea to call out that using other deserializers must be done with
> care - I will add that documentation to the serialization config.
>
> Note that there would not be an issue on the _serialization_ side of
> things as Jackson respects BigDecimal.
>
> Almog
>
> On Tue, Aug 6, 2019 at 11:23 PM Arjun Satish 
> wrote:
>
>> hey Almog, nice work! couple of thoughts (hope I'm not late since you
>> started the voting thread already):
>>
>> can you please add some examples to show the changes that you are
>> proposing. makes me think that for a given decimal number, we will have
>> two
>> encodings: “asHex” and “asNumber”.
>>
>> should we call the default config value “HEX” instead of “BINARY”?
>>
>> Should we call out the fact that JS systems might be susceptible to double
>> precision round offs with the new numeric format? here are some people
>> discussing a similar problem
>> https://github.com/EventStore/EventStore/issues/1541
>>
>> On Tue, Aug 6, 2019 at 1:40 PM Almog Gavra  wrote:
>>
>> > Hello Everyone,
>> >
>> > Summarizing an in-person discussion with Randall (this is copied from
>> the
>> > KIP):
>> >
>> > The original KIP suggested supporting an additional representation -
>> base10
>> > encoded text (e.g. `{"asText":"10.2345"}`). This causes issues because
>> it
>> > is impossible to disambiguate between TEXT and BINARY without an
>> additional
>> > config - furthermore, this makes the migration from one to the other
>> nearly
>> > impossible because it would require that all consumers stop consuming
>> and
>> > producers stop producing and atomically updating the config on all of
>> them
>> > after deploying the new code, or waiting for the full retention period
>> to
>> > pass - neither option is viable. The suggestion in the KIP is strictly
>> an
>> > improvement over the existing behavior, even if it doesn't support all
>> > combinations.
>> >
>> > It seems that since most real-world use cases actually use the numeric
>> > representation (not string) we can consider this an improvement. With
>> the
>> > new suggestion, we don't need a deserialization configuration (only a
>> > serialization option) and the consumers will be able to always
>> > automatically determine the serialization format.
>> >
>> > Based on this, I'll be opening up the simplified version of the KIP to a
>> > vote.
>> >
>> > Almog
>> >
>> > On Mon, Jul 29, 2019 at 9:29 AM Almog Gavra  wrote:
>> >
>> > > I'm mostly happy with your current suggestion (two configs, one for
>> > > serialization and one for deserialization) and your implementation
>> > > suggestion. One thing to note:
>> > >
>> > > > We should _always_ be able to deserialize a standard JSON
>> > > > number as a decimal
>> > >
>> > > I was doing some research into decimals and JSON, and I can imagine a
>> > > compelling reason to require string representations to avoid losing
>> > > precision and to be certain that whomever is sending the data isn't
>> > losing
>> > > precision (e.g. https://stackoverflow.com/a/38357877/2258040).
>> > >
>> > > I'm okay with always allowing numerics, but thought it's worth raising
>> > the
>> > > thought.
>> > >
>> > > On Mon, Jul 29, 2019 at 4:57 AM Andy Coates 
>> wrote:
>> > >
>> > >> The way I see it, we need to control two seperate things:
>> > >>
>> > >> 1. How do we _deserialize_ a decimal type if we encounter a text
>> node in
>> > >> the JSON?(We should _always_ be able to deserialize a standard
>> JSON
>> > >> number as a decimal).
>> > >> 2. How do we chose how we want decimals to be _serialized_.
>> > >>
>> > >> This looks to fits well with your second suggestion of slightly
>> > different
>> > >> configs names for serialization vs deserialization.
>> > >> a, For deserialization we only care about how to handle text nodes: `
>> > >> deserialization.decimal.*text*.format`, which should only have two
>> valid
>> > >> values BINARY | TEXT.
>> > >> b. For serialization we need all three:
>> `serialization.decimal.format`,
>> > >> which should support all three options: BINARY | TEXT | NUMERIC.
>> > >>
>> 

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Colin McCabe
On Wed, Aug 7, 2019, at 09:24, Harsha Ch wrote:
> On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org > wrote:
> 
> > On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> >> 
> >> Hi Colin,
> >> "Hmm... I'm not sure I follow. Users don't have to build their own
> >> tooling, right? They can use any of the shell scripts that we've shipped
> >> in the last few releases. For example, if any of your users run it, this
> >> shell script will delete all of the topics from your non-security-enabled
> >> cluster:
> >> 
> >> 
> >> 
> >> ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh ) --bootstrap-server
> >> localhost:9092 --list 2>/dev/null
> >> | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh ) 
> >> --bootstrap-server
> >> localhost:9092 --delete
> >> --topic
> >> 
> >> 
> >> 
> >> They will need to fill in the correct bootstrap servers list, of course,
> >> not localhost. This deletion script will work on some pretty old brokers,
> >> even back to the 0.10 releases. It seems a little odd to trust your users
> >> with this power, but not trust them to avoid changing a particular
> >> configuration key."
> >> 
> >> 
> >> 
> >> 
> >> The above will blocked by the server if we set delete.topic.enable to
> >> false and thats exactly what I am asking for.
> >> 
> >> 
> >> 
> > 
> > 
> > 
> > Hi Harsha,
> > 
> > 
> > 
> > 
> > I was wondering if someone was going to bring up that configuration :)
> > 
> > 
> > 
> > 
> > it's an interesting complication, but globally disabling topic deletion is
> > not very practical for most use-cases.
> > 
> > 
> > 
> > 
> > In any case, there are plenty of other bad things that users with full
> > permissions can do that aren't blocked by any server configuration. For
> > example, they can delete every record in every topic. I can write a script
> > for that too, and there's no server configuration you can set to disable
> > it. Or I could simply create hundreds of thousands of topics, until
> > cluster performance becomes unacceptable (this will be even more of a
> > problem if someone configured delete.topic.enable as false). Or publish
> > bad data to every topic, etc. etc.
> > 
> > 
> > 
> > 
> > The point I'm trying to make here is that you can't rely on these kind of
> > server-side configurations for security. At most, they're a way to set up
> > certain very simple policies. But the policies are so simple that they're
> > hardly ever useful any more.
> > 
> > 
> > 
> > 
> > For example, if the problem you want to solve is that you want a user to
> > only be able to create 50 topics and not delete anyone else's topics, you
> > can solve that with a CreateTopicsPolicy that limits the number of topics,
> > and some ACLs. There's no combination of auto.create.topics.enable and
> > delete.topic.enable that will help here.
> > 
> > 
> > 
> > 
> 
> Hi Colin,
> 
>             Well you gave the example that a user can delete the topics 
> just by running that script  :).  
> 
> I understand there are open APIs in Kafka and can lead to rogue clients 
> taking advantage of it without proper security in place.
> 
> What I am asking so far in this thread is , this KIP is changing the 
> producer behavior and its not backward compatible. 

The change is backwards compatible.  The default will still be server-side 
topic auto-creation, just like now.

You will have to specifically change the producer config to get the new 
behavior.

> We can still achieve 
> the main goal of the KIP which is to change MetadataRequest  creating 
> topics and send a CreateTopicRequest from Producer and also keep the 
> server side config to have precedence.  This KIP originally written to 
> have server side preference and there is not much explanation why it 
> changed to have Producer side preference.
> 
> Arguing that AdminClient can do that and so we are going to make 
> Producer do the same doesn't make sense.
> 
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> >> 
> >> 
> >> "The downside is that if we wanted to check a server side configuration
> >> before sending the create topics request, the code would be more complex.
> >> The behavior would also not be consistent with how topic auto-creation is
> >> handled in Kafka Streams."
> >> 
> >> 
> >> 
> >> 
> >> I am not sure why you need to check server side configuration before
> >> sending create topics request. A user enables producer side config to
> >> create topics.
> >> Producer sends a request to the broker and if the broker has
> >> auto.topic.create.enable to true (default) it will allow creation of
> >> topics. If it set to false it returns error back to the client.
> >> 
> >> 
> > 
> > 
> > 
> > auto.topic.create.enable has never affected CreateTopicsRequest. If you
> > submit a CreateTopicsRequest and you are authorized, a topic will be
> > created, regardless of what the value of auto.topic.create.enable is. This
> > behavior goes back a long way, to about 2016 (Kafka 0.10.1, I think?)
> > 
> > 
> > 
> >> 
> >> 
> 

Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-08-07 Thread Almog Gavra
Thanks for the feedback Arjun! I'm happy changing the default config to HEX
instead of BINARY, no strong feelings there.

I'll also clarify the example in the KIP to be clearer:

- serialize the decimal field "foo" with value "10.2345" with the HEX
setting: {"foo": "D3J5"}
- serialize the decimal field "foo" with value "10.2345" with the NUMERIC
setting: {"foo": 10.2345}

With regards to the precision issue, that was my original concern as well
(and why I originally suggested a TEXT format). Many JSON deserializers
(e.g. Jackson with DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS),
however, have the ability to deserialize decimals correctly so I will
configure that as the default for Connect's JsonDeserializer. It's probably
a good idea to call out that using other deserializers must be done with
care - I will add that documentation to the serialization config.

Note that there would not be an issue on the _serialization_ side of things
as Jackson respects BigDecimal.

Almog

On Tue, Aug 6, 2019 at 11:23 PM Arjun Satish  wrote:

> hey Almog, nice work! couple of thoughts (hope I'm not late since you
> started the voting thread already):
>
> can you please add some examples to show the changes that you are
> proposing. makes me think that for a given decimal number, we will have two
> encodings: “asHex” and “asNumber”.
>
> should we call the default config value “HEX” instead of “BINARY”?
>
> Should we call out the fact that JS systems might be susceptible to double
> precision round offs with the new numeric format? here are some people
> discussing a similar problem
> https://github.com/EventStore/EventStore/issues/1541
>
> On Tue, Aug 6, 2019 at 1:40 PM Almog Gavra  wrote:
>
> > Hello Everyone,
> >
> > Summarizing an in-person discussion with Randall (this is copied from the
> > KIP):
> >
> > The original KIP suggested supporting an additional representation -
> base10
> > encoded text (e.g. `{"asText":"10.2345"}`). This causes issues because it
> > is impossible to disambiguate between TEXT and BINARY without an
> additional
> > config - furthermore, this makes the migration from one to the other
> nearly
> > impossible because it would require that all consumers stop consuming and
> > producers stop producing and atomically updating the config on all of
> them
> > after deploying the new code, or waiting for the full retention period to
> > pass - neither option is viable. The suggestion in the KIP is strictly an
> > improvement over the existing behavior, even if it doesn't support all
> > combinations.
> >
> > It seems that since most real-world use cases actually use the numeric
> > representation (not string) we can consider this an improvement. With the
> > new suggestion, we don't need a deserialization configuration (only a
> > serialization option) and the consumers will be able to always
> > automatically determine the serialization format.
> >
> > Based on this, I'll be opening up the simplified version of the KIP to a
> > vote.
> >
> > Almog
> >
> > On Mon, Jul 29, 2019 at 9:29 AM Almog Gavra  wrote:
> >
> > > I'm mostly happy with your current suggestion (two configs, one for
> > > serialization and one for deserialization) and your implementation
> > > suggestion. One thing to note:
> > >
> > > > We should _always_ be able to deserialize a standard JSON
> > > > number as a decimal
> > >
> > > I was doing some research into decimals and JSON, and I can imagine a
> > > compelling reason to require string representations to avoid losing
> > > precision and to be certain that whomever is sending the data isn't
> > losing
> > > precision (e.g. https://stackoverflow.com/a/38357877/2258040).
> > >
> > > I'm okay with always allowing numerics, but thought it's worth raising
> > the
> > > thought.
> > >
> > > On Mon, Jul 29, 2019 at 4:57 AM Andy Coates  wrote:
> > >
> > >> The way I see it, we need to control two seperate things:
> > >>
> > >> 1. How do we _deserialize_ a decimal type if we encounter a text node
> in
> > >> the JSON?(We should _always_ be able to deserialize a standard
> JSON
> > >> number as a decimal).
> > >> 2. How do we chose how we want decimals to be _serialized_.
> > >>
> > >> This looks to fits well with your second suggestion of slightly
> > different
> > >> configs names for serialization vs deserialization.
> > >> a, For deserialization we only care about how to handle text nodes: `
> > >> deserialization.decimal.*text*.format`, which should only have two
> valid
> > >> values BINARY | TEXT.
> > >> b. For serialization we need all three:
> `serialization.decimal.format`,
> > >> which should support all three options: BINARY | TEXT | NUMERIC.
> > >>
> > >> Implementation wise, I think these should be two separate enums,
> rather
> > >> than one shared enum and throwing an error if the deserializer is set
> to
> > >> NUMERIC.  Mainly as this means the enums reflect the options
> available,
> > >> rather than this being hidden in config checking code.  But that's a

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Harsha Ch
On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org > wrote:

> 
> 
> 
> On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> 
> 
> 
>> 
>> 
>> Hi Colin,
>> "Hmm... I'm not sure I follow. Users don't have to build their own
>> tooling, right? They can use any of the shell scripts that we've shipped
>> in the last few releases. For example, if any of your users run it, this
>> shell script will delete all of the topics from your non-security-enabled
>> cluster:
>> 
>> 
>> 
>> ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh ) --bootstrap-server
>> localhost:9092 --list 2>/dev/null
>> | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh ) 
>> --bootstrap-server
>> localhost:9092 --delete
>> --topic
>> 
>> 
>> 
>> They will need to fill in the correct bootstrap servers list, of course,
>> not localhost. This deletion script will work on some pretty old brokers,
>> even back to the 0.10 releases. It seems a little odd to trust your users
>> with this power, but not trust them to avoid changing a particular
>> configuration key."
>> 
>> 
>> 
>> 
>> The above will blocked by the server if we set delete.topic.enable to
>> false and thats exactly what I am asking for.
>> 
>> 
>> 
> 
> 
> 
> Hi Harsha,
> 
> 
> 
> 
> I was wondering if someone was going to bring up that configuration :)
> 
> 
> 
> 
> it's an interesting complication, but globally disabling topic deletion is
> not very practical for most use-cases.
> 
> 
> 
> 
> In any case, there are plenty of other bad things that users with full
> permissions can do that aren't blocked by any server configuration. For
> example, they can delete every record in every topic. I can write a script
> for that too, and there's no server configuration you can set to disable
> it. Or I could simply create hundreds of thousands of topics, until
> cluster performance becomes unacceptable (this will be even more of a
> problem if someone configured delete.topic.enable as false). Or publish
> bad data to every topic, etc. etc.
> 
> 
> 
> 
> The point I'm trying to make here is that you can't rely on these kind of
> server-side configurations for security. At most, they're a way to set up
> certain very simple policies. But the policies are so simple that they're
> hardly ever useful any more.
> 
> 
> 
> 
> For example, if the problem you want to solve is that you want a user to
> only be able to create 50 topics and not delete anyone else's topics, you
> can solve that with a CreateTopicsPolicy that limits the number of topics,
> and some ACLs. There's no combination of auto.create.topics.enable and
> delete.topic.enable that will help here.
> 
> 
> 
> 

Hi Colin,

            Well you gave the example that a user can delete the topics just by 
running that script  :).  

I understand there are open APIs in Kafka and can lead to rogue clients taking 
advantage of it without proper security in place.

What I am asking so far in this thread is , this KIP is changing the producer 
behavior and its not backward compatible. We can still achieve the main goal of 
the KIP which is to change MetadataRequest  creating topics and send a 
CreateTopicRequest from Producer and also keep the server side config to have 
precedence.  This KIP originally written to have server side preference and 
there is not much explanation why it changed to have Producer side preference.

Arguing that AdminClient can do that and so we are going to make Producer do 
the same doesn't make sense.

> 
> 
> 
> 
> 
> 
> 
>> 
>> 
>> "The downside is that if we wanted to check a server side configuration
>> before sending the create topics request, the code would be more complex.
>> The behavior would also not be consistent with how topic auto-creation is
>> handled in Kafka Streams."
>> 
>> 
>> 
>> 
>> I am not sure why you need to check server side configuration before
>> sending create topics request. A user enables producer side config to
>> create topics.
>> Producer sends a request to the broker and if the broker has
>> auto.topic.create.enable to true (default) it will allow creation of
>> topics. If it set to false it returns error back to the client.
>> 
>> 
> 
> 
> 
> auto.topic.create.enable has never affected CreateTopicsRequest. If you
> submit a CreateTopicsRequest and you are authorized, a topic will be
> created, regardless of what the value of auto.topic.create.enable is. This
> behavior goes back a long way, to about 2016 (Kafka 0.10.1, I think?)
> 
> 
> 
>> 
>> 
>> I don't see how this behavior will be different in Kafka streams. By
>> default server allows the topic creation and with this KIP, It will only
>> allow creation of topic when both producer and server side are turned on.
>> Its exactly the same behavior in KIP-361.
>> 
>> 
>> 
> 
> 
> 
> I suggest running a streams job as a test. It creates the topics it needs
> using AdminClient. The value of auto.topic.create.enable is not relevant.
> Whether it is true or false, the topics will still be created.
> 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-07 Thread Jason Gustafson
Hi Boyang,

> We already persist member.id, instance.id and generation.id in the offset
topic, what extra fields we need to store?

Yeah, you're right. I was a little confused and thought this information
was needed by the transaction coordinator.

> This should be easily done on the stream side as we have
StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
but non-stream user has to code the callback by hand, do you think the
convenience we sacrifice here worth the simplification benefit?

Either way, you need a reference to the consumer. I was mostly just
thinking it would be better to reduce the integration point to its minimum.
Have you thought through the implications of needing to keep around a
reference to the consumer in the producer? What if it gets closed? It seems
better not to have to think about these cases.

-Jason

On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen 
wrote:

> Thank you for the suggestions Jason. And a side note for Guozhang, I
> updated the KIP to reflect the dependency on 447.
>
> On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson 
> wrote:
>
> > Hi Boyang, thanks for the updates. I have a few more comments:
> >
> > 1. We are adding some new fields to TxnOffsetCommit to support
> group-based
> > fencing. Do we need these fields to be persisted in the offsets topic to
> > ensure that the fencing still works after a coordinator failover?
> >
> > We already persist member.id, instance.id and generation.id in the
> offset
> topic, what extra fields we need to store?
>
>
> > 2. Since you are proposing a new `groupMetadata` API, have you considered
> > whether we still need the `initTransactions` overload? Another way would
> be
> > to pass it through the `sendOffsetsToTransaction` API:
> >
> > void sendOffsetsToTransaction(Map
> > offsets, GroupMetadata groupMetadata) throws
> > ProducerFencedException, IllegalGenerationException;
> >
> > This seems a little more consistent with the current API and avoids the
> > direct dependence on the Consumer in the producer.
> >
> > Note that although we avoid one dependency to consumer, producer needs to
> periodically update
> its group metadata, or in this case the caller of
> *sendOffsetsToTransaction(Map OffsetAndMetadata>*
> *offsets, GroupMetadata groupMetadata) *is responsible for getting the
> latest value of group metadata.
> This should be easily done on the stream side as we have
> StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
> but non-stream user has to code the callback by hand, do you think the
> convenience we sacrifice here worth the simplification benefit?
>
>
> > 3. Can you clarify the behavior of the clients when the brokers do not
> > support the latest API versions? This is both for the new TxnOffsetCommit
> > and the OffsetFetch APIs. I guess the high level idea in streams is to
> > detect broker support before instantiating the producer and consumer. I
> > think that's reasonable, but we might need some approach for non-streams
> > use cases. One option I was considering is enforcing the latest version
> > through the new `sendOffsetsToTransaction` API. Basically when you use
> the
> > new API, we require support for the latest TxnOffsetCommit version. This
> > puts some burden on users, but it avoids breaking correctness assumptions
> > when the new APIs are in use. What do you think?
> >
> Yes, I think we haven't covered this case, so the plan is to crash the
> non-stream application when the job is using new sendOffsets API.
>
> >
> > -Jason
> >
> >
> >
> >
> > On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen 
> > wrote:
> >
> > > Yep, Guozhang I think that would be best as passing in an entire
> consumer
> > > instance is indeed cumbersome.
> > >
> > > Just saw you updated KIP-429, I will follow-up to change 447 as well.
> > >
> > > Best,
> > > Boyang
> > >
> > > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang 
> wrote:
> > >
> > > > okay I think I understand your concerns about ConsumerGroupMetadata
> > now:
> > > if
> > > > we still want to only call initTxns once, then we should allow the
> > > whatever
> > > > passed-in parameter to reflect the latest value of generation id
> > whenever
> > > > sending the offset fetch request.
> > > >
> > > > Whereas the current ConsumerGroupMetadata is a static object.
> > > >
> > > > Maybe we can consider having an extended class of
> ConsumerGroupMetadata
> > > > whose values are updated from the consumer's rebalance callback?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > > > reflected
> > > > > the latest change on ConsumerGroupMetadata? Also regarding question
> > > one,
> > > > > the group metadata needs to be accessed via callback, does that
> mean
> > we
> > > > > need a separate producer API such like
> > > > > 

Re: [DISCUSS] KIP-317: Transparent Data Encryption

2019-08-07 Thread Andrew Schofield
Hi,
I think this is a useful KIP and it looks good in principle. While it can all 
be done using
interceptors, if the brokers do not know anything about it, you need to 
maintain the
mapping from topics to key ids somewhere external. I'd prefer the way you've 
done it.

I'm not sure whether you'll manage to interest any committers in volunteering an
opinion, and you'll need that before you can get the KIP accepted into Kafka.

Thanks,
Andrew Schofield (IBM)

On 06/08/2019, 15:46, "Sönke Liebau"  
wrote:

Hi,

I have so far received pretty much no comments on the technical details
outlined in the KIP. While I am happy to continue with my own ideas of how
to implement this, I would much prefer to at least get a very broad "looks
good in principle, but still lots to flesh out" from a few people before I
but more work into this.

Best regards,
Sönke




On Tue, 21 May 2019 at 14:15, Sönke Liebau 
wrote:

> Hi everybody,
>
> I'd like to rekindle the discussion around KIP-317.
> I have reworked the KIP a little bit in order to design everything as a
> pluggable implementation. During the course of that work I've also decided
> to rename the KIP, as encryption will only be transparent in some cases. 
It
> is now called "Add end to end data encryption functionality to Apache
> Kafka" [1].
>
> I'd very much appreciate it if you could give the KIP a quick read. This
> is not at this point a fully fleshed out design, as I would like to agree
> on the underlying structure that I came up with first, before spending 
time
> on details.
>
> TL/DR is:
> Create three pluggable classes:
> KeyManager runs on the broker and manages which keys to use, key rollover
> etc
> KeyProvider runs on the client and retrieves keys based on what the
> KeyManager tells it
> EncryptionEngine runs on the client andhandles the actual encryption
> First idea of control flow between these components can be seen at [2]
>
> Please let me know any thoughts or concerns that you may have!
>
> Best regards,
> Sönke
>
> [1]
> 
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-317%253A%2BAdd%2Bend-to-end%2Bdata%2Bencryption%2Bfunctionality%2Bto%2BApache%2BKafkadata=02%7C01%7C%7Cc858aa722cc9434ba98d08d71a7cd547%7C84df9e7fe9f640afb435%7C1%7C0%7C637006995760557724sdata=GwcvmfILdjTZBxOseHR4IjUY0oMG3%2BKEjFNHo3pJlvc%3Dreserved=0
> [2]
> 
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdownload%2Fattachments%2F85479936%2Fkafka_e2e-encryption_control-flow.png%3Fversion%3D1%26modificationDate%3D1558439227551%26api%3Dv2data=02%7C01%7C%7Cc858aa722cc9434ba98d08d71a7cd547%7C84df9e7fe9f640afb435%7C1%7C0%7C637006995760557724sdata=FcMoNEliLn48OZfWca1TCQv%2BiIlRNqJNQvU52UfkbEs%3Dreserved=0
>
>
>
> On Fri, 10 Aug 2018 at 14:05, Sönke Liebau 
> wrote:
>
>> Hi Viktor,
>>
>> thanks for your input! We could accommodate magic headers by removing any
>> known fixed bytes pre-encryption, sticking them in a header field and
>> prepending them after decryption. However, I am not sure whether this is
>> actually necessary, as most modern (AES for sure) algorithms are 
considered
>> to be resistant to known-plaintext types of attack. Even if the entire
>> plaintext is known to the attacker he still needs to brute-force the key 
-
>> which may take a while.
>>
>> Something different to consider in this context are compression
>> sidechannel attacks like CRIME or BREACH, which may be relevant depending
>> on what type of data is being sent through Kafka. Both these attacks 
depend
>> on the encrypted record containing a combination of secret and user
>> controlled data.
>> For example if Kafka was used to forward data that the user entered on a
>> website along with a secret API key that the website adds to a back-end
>> server and the user can obtain the Kafka messages, these attacks would
>> become relevant. Not much we can do about that except disallow encryption
>> when compression is enabled (TLS chose this approach in version 1.3)
>>
>> I agree with you, that we definitely need to clearly document any risks
>> and how much security can reasonably be expected in any given scenario. 
We
>> might even consider logging a warning message when sending data that is
>> compressed and encrypted.
>>
>> On a different note, I've started amending the KIP to make key management
>> and distribution pluggable, should hopefully be able to publish sometime
>> Monday.
>>
>> Best regards,
>> Sönke
>>
>>
>> On Thu, Jun 21, 2018 at 12:26 PM, Viktor Somogyi > > wrote:
>>
>>> Hi Sönke,
>>>

Build failed in Jenkins: kafka-trunk-jdk8 #3834

2019-08-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Upgrade jackson-databind to 2.9.9.3 (#7125)

--
[...truncated 1.79 MB...]

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveMeteredStoreAsOuterStore STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveMeteredStoreAsOuterStore PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldWrapPlainKeyValueStoreAsTimestampStore STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldWrapPlainKeyValueStoreAsTimestampStore PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfTimeIsNull STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfTimeIsNull PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveCachingAndChangeLoggingWhenBothEnabled STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveCachingAndChangeLoggingWhenBothEnabled PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldNotHaveChangeLoggingStoreWhenDisabled STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldNotHaveChangeLoggingStoreWhenDisabled PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfInnerIsNull STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfInnerIsNull PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfKeySerdeIsNull STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfKeySerdeIsNull PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveChangeLoggingStoreWhenLoggingEnabled STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldHaveChangeLoggingStoreWhenLoggingEnabled PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfMetricsScopeIsNull STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfMetricsScopeIsNull PASSED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfValueSerdeIsNull STARTED

org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderTest > 
shouldThrowNullPointerIfValueSerdeIsNull PASSED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldIterateOverValueFromBothIterators STARTED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldIterateOverValueFromBothIterators PASSED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldPeekNextCacheKey STARTED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldPeekNextCacheKey PASSED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldPeekNextStoreKey STARTED

org.apache.kafka.streams.state.internals.MergedSortedCacheWrappedWindowStoreIteratorTest
 > shouldPeekNextStoreKey PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetchingRange STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetchingRange PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldLogPuts STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldLogPuts PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldRetainDuplicatesWhenSet STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldRetainDuplicatesWhenSet PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetching STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetching PASSED

org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest > 
shouldReturnKeyValueStore STARTED

org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest > 
shouldReturnKeyValueStore PASSED

org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest > 
shouldReturnSingleItemListIfStoreExists STARTED

org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest > 

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Justine Olshan
Hello all,

Thank you for all the feedback!

It seems that one of the main issues is how the client-side auto-creation
can act on its own and does not simply block or allow auto-creation as
configured by the broker.

I think I was a bit unclear about this, but the idea of this KIP is to
eventually replace the functionality of the broker config. We don't want to
check if the broker config is also enabled because the idea is that the
broker config would not be enabled, and only specific producers/clients
would be auto-creating topics.
Some older clients require auto-topic creation for only the topics they
need, and this KIP would make these clients compatible with brokers that
disable autocreation.

I now understand the worry about security and 'overriding' the broker's
auto.create.topic.enable configuration. However, in the case I explained
above, having the broker stop the producer would prevent the clients I
described above from being able to create topics. (Basically not allowing
the main point of creating this KIP.)
I'm not sure to go about having such a security feature. Perhaps adding
another config on the broker to prevent this that would be off by default
but could be turned on? It would complicate things more, but I'm open to
discussing.

Thank you,
Justine

On Tue, Aug 6, 2019 at 11:47 PM Colin McCabe  wrote:

> On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> > Hi Colin,
> > "Hmm... I'm not sure I follow.  Users don't have to build their own
> > tooling, right?  They can use any of the shell scripts that we've shipped
> > in the last few releases.  For example, if any of your users run it, this
> > shell script will delete all of the topics from your non-security-enabled
> > cluster:
> >
> > ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
> 2>/dev/null
> > | xargs -l ./bin/kafka-topics.sh --bootstrap-server localhost:9092
> --delete
> > --topic
> >
> > They will need to fill in the correct bootstrap servers list, of course,
> > not localhost.  This deletion script will work on some pretty old
> brokers,
> > even back to the 0.10 releases.  It seems a little odd to trust your
> users
> > with this power, but not trust them to avoid changing a particular
> > configuration key."
> >
> > The above will blocked by the server if we set delete.topic.enable to
> false
> > and thats exactly what I am asking for.
>
> Hi Harsha,
>
> I was wondering if someone was going to bring up that configuration :)
>
> it's an interesting complication, but globally disabling topic deletion is
> not very practical for most use-cases.
>
> In any case, there are plenty of other bad things that users with full
> permissions can do that aren't blocked by any server configuration.  For
> example, they can delete every record in every topic.  I can write a script
> for that too, and there's no server configuration you can set to disable
> it.  Or I could simply create hundreds of thousands of topics, until
> cluster performance becomes unacceptable (this will be even more of a
> problem if someone configured delete.topic.enable as false).  Or publish
> bad data to every topic, etc. etc.
>
> The point I'm trying to make here is that you can't rely on these kind of
> server-side configurations for security.  At most, they're a way to set up
> certain very simple policies.  But the policies are so simple that they're
> hardly ever useful any more.
>
> For example, if the problem you want to solve is that you want a user to
> only be able to create 50 topics and not delete anyone else's topics, you
> can solve that with a CreateTopicsPolicy that limits the number of topics,
> and some ACLs.  There's no combination of auto.create.topics.enable and
> delete.topic.enable that will help here.
>
> >
> > "The downside is that if we wanted to check a server side configuration
> > before sending the create topics request, the code would be more complex.
> > The behavior would also not be consistent with how topic auto-creation is
> > handled in Kafka Streams."
> >
> > I am not sure why you need to check server side configuration before
> > sending create topics request. A user enables producer side config to
> > create topics.
> > Producer sends a request to the broker and if the broker has
> > auto.topic.create.enable to true (default) it will allow creation of
> > topics. If it set to false it returns error back to the client.
>
> auto.topic.create.enable has never affected CreateTopicsRequest.  If you
> submit a CreateTopicsRequest and you are authorized, a topic will be
> created, regardless of what the value of auto.topic.create.enable is.  This
> behavior goes back a long way, to about 2016 (Kafka 0.10.1, I think?)
>
> > I don't see how this behavior will be different in Kafka streams. By
> > default server allows the topic creation and with this KIP, It will only
> > allow creation of topic when both producer and server side are turned on.
> > Its exactly the same behavior in KIP-361.
>
> I suggest running a 

Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-07 Thread Harsha Chintalapani
Thanks for the KIP Rajini.
Quick thought, it would be good to have a common module outside of clients
that only applies to server side interfaces & changes. It looks like we are
increasingly in favor of using Java interface for pluggable modules  on the
broker side.

Thanks,
Harsha


On Tue, Aug 06, 2019 at 2:31 PM, Rajini Sivaram 
wrote:

> Hi all,
>
> I have created a KIP to replace the Scala Authorizer API with a new Java
> API:
>
> -
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-504+-+Add+new+Java+Authorizer+Interface
>
> This is replacement for KIP-50 which was accepted but never merged. Apart
> from moving to a Java API consistent with other pluggable interfaces in the
> broker, KIP-504 also attempts to address known limitations in the
> authorizer. If you have come across other limitations that you would like
> to see addressed in the new API, please raise these on the discussion
> thread so that we can consider those too. All suggestions and feedback are
> welcome.
>
> Thank you,
>
> Rajini
>


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-07 Thread David Arthur
Thanks for the feedback, Stan. That's a good point about the partition
count -- I'll poke around and see if I can surface this value in the
Controller.

On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski 
wrote:

> Thanks for the KIP David,
>
> As you mentioned in the KIP - "when a large number of topics (partitions,
> really) are deleted at once, it can take significant time for the
> Controller to process everything.
> In that sense, does it make sense to have the metric expose the number of
> partitions that are pending deletion, as opposed to topics? Perhaps even
> both?
> My reasoning is that this metric alone wouldn't say much if we had one
> topic with 1000 partitions versus a topic with 1 partition
>
> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani 
> wrote:
>
> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> > -Harsha
> >
> >
> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur 
> > wrote:
> >
> > > Hello all, I'd like to start a discussion for
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> > >
> > > Thanks!
> > > David
> > >
> >
>
>
> --
> Best,
> Stanislav
>


-- 
David Arthur


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-07 Thread Sönke Liebau
Hi Rajini,

looks great and addresses a few gripes I had in the past, thanks for that!

One idea that I had while reading, but I am not sure if this is taking
"being flexible" a step too far maybe.. Would it make sense to make the
decision at which severity to log a decision pluggable/configurable? We
have a few customers that have different regulatory requirements for
auditing access depending on the type of data that is in topics. So for
some topics they might actually need to log every access, for some just the
denied ones and for some no one cares at all.

Best regards,
Sönke



On Wed, 7 Aug 2019 at 12:28, Ron Dagostino  wrote:

> Looks great, Rajini — a detailed and complete KIP with a great
> backwards-compatibility plan.  Nothing came to mind aside from how easy it
> was to read and understand.  Thanks for writing it so clearly.
>
> Ron
>
> > On Aug 6, 2019, at 5:31 PM, Rajini Sivaram 
> wrote:
> >
> > Hi all,
> >
> > I have created a KIP to replace the Scala Authorizer API with a new Java
> > API:
> >
> >   -
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
> >
> > This is replacement for KIP-50 which was accepted but never merged. Apart
> > from moving to a Java API consistent with other pluggable interfaces in
> the
> > broker, KIP-504 also attempts to address known limitations in the
> > authorizer. If you have come across other limitations that you would like
> > to see addressed in the new API, please raise these on the discussion
> > thread so that we can consider those too. All suggestions and feedback
> are
> > welcome.
> >
> > Thank you,
> >
> > Rajini
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


[jira] [Created] (KAFKA-8764) LogCleanerManager endless loop while compacting/cleaning segments

2019-08-07 Thread Tomislav Rajakovic (JIRA)
Tomislav Rajakovic created KAFKA-8764:
-

 Summary: LogCleanerManager endless loop while compacting/cleaning 
segments
 Key: KAFKA-8764
 URL: https://issues.apache.org/jira/browse/KAFKA-8764
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.2.1
 Environment: docker base image: openjdk:8-jre-alpine base image, kafka 
from http://ftp.carnet.hr/misc/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
Reporter: Tomislav Rajakovic
 Attachments: log-cleaner-bug-reproduction.zip

{{LogCleanerManager stuck in endless loop while clearing segments for one 
partition resulting with many log outputs and heavy disk read/writes/IOPS.}}

 

Issue appeared on follower brokers, and it happens on every (new) broker if 
partition assignment is changed.

 

Original issue setup:
 * kafka_2.12-2.2.1 deployed as statefulset on kubernetes, 5 brokers
 * log directory is (AWS) EBS mounted PV, gp2 (ssd) kind of 750GiB
 * 5 zookeepers
 * topic created with config:
 ** name = "backup_br_domain_squad"
partitions = 36
replication_factor = 3

config = {
 "cleanup.policy" = "compact"
 "min.compaction.lag.ms" = "8640"
 "min.cleanable.dirty.ratio" = "0.3"
}

 

 

Log excerpt:

{{[2019-08-07 12:10:53,895] INFO [Log partition=backup_br_domain_squad-14, 
dir=/var/lib/kafka/data/topics] Deleting segment 0 (kafka.log.Log)}}
{{[2019-08-07 12:10:53,895] INFO Deleted log 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.log.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:53,896] INFO Deleted offset index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.index.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:53,896] INFO Deleted time index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.timeindex.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:53,964] INFO [Log partition=backup_br_domain_squad-14, 
dir=/var/lib/kafka/data/topics] Deleting segment 0 (kafka.log.Log)}}
{{[2019-08-07 12:10:53,964] INFO Deleted log 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.log.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:53,964] INFO Deleted offset index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.index.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:53,964] INFO Deleted time index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.timeindex.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,031] INFO [Log partition=backup_br_domain_squad-14, 
dir=/var/lib/kafka/data/topics] Deleting segment 0 (kafka.log.Log)}}
{{[2019-08-07 12:10:54,032] INFO Deleted log 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.log.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,032] INFO Deleted offset index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.index.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,032] INFO Deleted time index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.timeindex.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,101] INFO [Log partition=backup_br_domain_squad-14, 
dir=/var/lib/kafka/data/topics] Deleting segment 0 (kafka.log.Log)}}
{{[2019-08-07 12:10:54,101] INFO Deleted log 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.log.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,101] INFO Deleted offset index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.index.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,101] INFO Deleted time index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.timeindex.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,173] INFO [Log partition=backup_br_domain_squad-14, 
dir=/var/lib/kafka/data/topics] Deleting segment 0 (kafka.log.Log)}}
{{[2019-08-07 12:10:54,173] INFO Deleted log 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.log.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,173] INFO Deleted offset index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.index.deleted.
 (kafka.log.LogSegment)}}
{{[2019-08-07 12:10:54,173] INFO Deleted time index 
/var/lib/kafka/data/topics/backup_br_domain_squad-14/.timeindex.deleted.
 (kafka.log.LogSegment)}}

 

And such log keeps repeating forever.

 

 

I've been able to extract segment files from (running) leader broker, and 
replicated same behaviour locally.
h1.  
h1. *Reproduction setup:*
 * start single broker kafka_2.12-2.2.1
 * create topic
 ** {{./bin/kafka-topics.sh --bootstrap-server *__BOOTSTRAP_SERVER__*:9092 
--create --topic backup_br_domain_squad --partitions 1 --replication-factor 1 
--config cleanup.policy=compact 

Build failed in Jenkins: kafka-trunk-jdk11 #738

2019-08-07 Thread Apache Jenkins Server
See 


Changes:

[manikumar] KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken

--
[...truncated 2.58 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Jenkins build is back to normal : kafka-2.2-jdk8 #157

2019-08-07 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-2.3-jdk8 #82

2019-08-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-07 Thread Ron Dagostino
Looks great, Rajini — a detailed and complete KIP with a great 
backwards-compatibility plan.  Nothing came to mind aside from how easy it was 
to read and understand.  Thanks for writing it so clearly.

Ron

> On Aug 6, 2019, at 5:31 PM, Rajini Sivaram  wrote:
> 
> Hi all,
> 
> I have created a KIP to replace the Scala Authorizer API with a new Java
> API:
> 
>   -
>   
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface
> 
> This is replacement for KIP-50 which was accepted but never merged. Apart
> from moving to a Java API consistent with other pluggable interfaces in the
> broker, KIP-504 also attempts to address known limitations in the
> authorizer. If you have come across other limitations that you would like
> to see addressed in the new API, please raise these on the discussion
> thread so that we can consider those too. All suggestions and feedback are
> welcome.
> 
> Thank you,
> 
> Rajini


Build failed in Jenkins: kafka-trunk-jdk11 #737

2019-08-07 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Upgrade jackson-databind to 2.9.9.3 (#7125)

--
[...truncated 2.60 MB...]

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnectNull 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectBadSchema PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectNull PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testToConnect 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testFromConnect 
STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > testFromConnect 
PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectInvalidValue STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectInvalidValue PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWords STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWords PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > filter a KTable should 
filter records satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > filter a KTable should 
filter records satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > filterNot a KTable should 
filter records not satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > filterNot a KTable should 
filter records not satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables should join 
correctly records STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables should join 
correctly records PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables with a 
Materialized should join correctly records and state store STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables with a 
Materialized should join correctly records and state store PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilTimeLimit STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilTimeLimit PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilWindowCloses STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilWindowCloses PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > session windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilWindowCloses STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > session windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilWindowCloses PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > non-windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilTimeLimit STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > non-windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilTimeLimit PASSED

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialized 
should create a Materialized with Serdes STARTED

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialized 
should create a Materialized with Serdes PASSED


[jira] [Resolved] (KAFKA-8599) Replace ExpireDelegationToken request/response with automated protocol

2019-08-07 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-8599.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 7098
[https://github.com/apache/kafka/pull/7098]

> Replace ExpireDelegationToken request/response with automated protocol
> --
>
> Key: KAFKA-8599
> URL: https://issues.apache.org/jira/browse/KAFKA-8599
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 2.4.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-491: Preferred Leader Deprioritized List (Temporary Blacklist)

2019-08-07 Thread Satish Duggana
Hi George,
Thanks for addressing the comments. I do not have any more questions.

On Wed, Aug 7, 2019 at 11:08 AM George Li
 wrote:
>
>  Hi Colin, Satish, Stanislav,
>
> Did I answer all your comments/concerns for KIP-491 ?  Please let me know if 
> you have more questions regarding this feature.  I would like to start coding 
> soon. I hope this feature can get into the open source trunk so every time we 
> upgrade Kafka in our environment, we don't need to cherry pick this.
>
> BTW, I have added below in KIP-491 for auto.leader.rebalance.enable behavior 
> with the new Preferred Leader "Blacklist".
>
> "When auto.leader.rebalance.enable is enabled.  The broker(s) in the 
> preferred leader "blacklist" should be excluded from being elected leaders. "
>
>
> Thanks,
> George
>
> On Friday, August 2, 2019, 08:02:07 PM PDT, George Li 
>  wrote:
>
>   Hi Colin,
> Thanks for looking into this KIP.  Sorry for the late response. been busy.
>
> If a cluster has MAMY topic partitions, moving this "blacklist" broker to the 
> end of replica list is still a rather "big" operation, involving submitting 
> reassignments.  The KIP-491 way of blacklist is much simpler/easier and can 
> undo easily without changing the replica assignment ordering.
> Major use case for me, a failed broker got swapped with new hardware, and 
> starts up as empty (with latest offset of all partitions), the SLA of 
> retention is 1 day, so before this broker is up to be in-sync for 1 day, we 
> would like to blacklist this broker from serving traffic. after 1 day, the 
> blacklist is removed and run preferred leader election.  This way, no need to 
> run reassignments before/after.  This is the "temporary" use-case.
>
> There are use-cases that this Preferred Leader "blacklist" can be somewhat 
> permanent, as I explained in the AWS data center instances Vs. on-premises 
> data center bare metal machines (heterogenous hardware), that the AWS 
> broker_ids will be blacklisted.  So new topics created,  or existing topic 
> expansion would not make them serve traffic even they could be the preferred 
> leader.
>
> Please let me know there are more question.
>
>
> Thanks,
> George
>
> On Thursday, July 25, 2019, 08:38:28 AM PDT, Colin McCabe 
>  wrote:
>
>  We still want to give the "blacklisted" broker the leadership if nobody else 
> is available.  Therefore, isn't putting a broker on the blacklist pretty much 
> the same as moving it to the last entry in the replicas list and then 
> triggering a preferred leader election?
>
> If we want this to be undone after a certain amount of time, or under certain 
> conditions, that seems like something that would be more effectively done by 
> an external system, rather than putting all these policies into Kafka.
>
> best,
> Colin
>
>
> On Fri, Jul 19, 2019, at 18:23, George Li wrote:
> >  Hi Satish,
> > Thanks for the reviews and feedbacks.
> >
> > > > The following is the requirements this KIP is trying to accomplish:
> > > This can be moved to the"Proposed changes" section.
> >
> > Updated the KIP-491.
> >
> > > >>The logic to determine the priority/order of which broker should be
> > > preferred leader should be modified.  The broker in the preferred leader
> > > blacklist should be moved to the end (lowest priority) when
> > > determining leadership.
> > >
> > > I believe there is no change required in the ordering of the preferred
> > > replica list. Brokers in the preferred leader blacklist are skipped
> > > until other brokers int he list are unavailable.
> >
> > Yes. partition assignment remained the same, replica & ordering. The
> > blacklist logic can be optimized during implementation.
> >
> > > >>The blacklist can be at the broker level. However, there might be use 
> > > >>cases
> > > where a specific topic should blacklist particular brokers, which
> > > would be at the
> > > Topic level Config. For this use cases of this KIP, it seems that broker 
> > > level
> > > blacklist would suffice.  Topic level preferred leader blacklist might
> > > be future enhancement work.
> > >
> > > I agree that the broker level preferred leader blacklist would be
> > > sufficient. Do you have any use cases which require topic level
> > > preferred blacklist?
> >
> >
> >
> > I don't have any concrete use cases for Topic level preferred leader
> > blacklist.  One scenarios I can think of is when a broker has high CPU
> > usage, trying to identify the big topics (High MsgIn, High BytesIn,
> > etc), then try to move the leaders away from this broker,  before doing
> > an actual reassignment to change its preferred leader,  try to put this
> > preferred_leader_blacklist in the Topic Level config, and run preferred
> > leader election, and see whether CPU decreases for this broker,  if
> > yes, then do the reassignments to change the preferred leaders to be
> > "permanent" (the topic may have many partitions like 256 that has quite
> > a few of them having this broker as preferred leader).  So this 

Re: Alternative of poll(0) without pulling records

2019-08-07 Thread Jungtaek Lim
If we just wanted to remove deprecation and let both co-exist, that would
be also viable, though `poll(0)` is still a hack and it would be ideal to
provide official approach to do so.

On Wed, Aug 7, 2019 at 4:24 PM Jungtaek Lim  wrote:

> Hi devs,
>
> I'm trying to replace deprecated poll(long) with poll(Duration), and
> realized there's no alternative which behaves exactly same as poll(0), as
> poll(0) has been used as a hack to only update metadata instead of pulling
> records. poll(Duration.ZERO) wouldn't behave same since even updating
> metadata will be timed-out. So now end users would need to give more
> timeout and even pull some records even they're only interested in metadata.
>
> I looked back some KIPs which brought the change, and "discarded" KIP
> (KIP-288 [1]) actually proposed a new API which only pulls metadata.
> KIP-266 [2] is picked up instead but it didn't cover all the things what
> KIP-288 proposed. I'm seeing some doc explaining poll(0) hasn't been
> supported officially, but the hack has been widely used and they can't be
> ignored.
>
> Kafka test code itself relies on either deprecated poll(0),
> or updateAssignmentMetadataIfNeeded, which seems to be private API only for
> testing.
> (Btw, I'd try out replacing poll(0) to updateAssignmentMetadataIfNeeded as
> avoiding deprecated method - if it works I'll submit a PR.)
>
> I'm feeling that it would be ideal to expose
> `updateAssignmentMetadataIfNeeded` to the public API, maybe with renaming
> as `waitForAssignment` which was proposed in KIP-288 if it feels too long.
>
> What do you think? If it sounds feasible I'd like to try out contribution
> on this. I'm new to contribute Kafka community, so not sure it would
> require a new KIP or not.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method
> 2.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Alternative of poll(0) without pulling records

2019-08-07 Thread Jungtaek Lim
Hi devs,

I'm trying to replace deprecated poll(long) with poll(Duration), and
realized there's no alternative which behaves exactly same as poll(0), as
poll(0) has been used as a hack to only update metadata instead of pulling
records. poll(Duration.ZERO) wouldn't behave same since even updating
metadata will be timed-out. So now end users would need to give more
timeout and even pull some records even they're only interested in metadata.

I looked back some KIPs which brought the change, and "discarded" KIP
(KIP-288 [1]) actually proposed a new API which only pulls metadata.
KIP-266 [2] is picked up instead but it didn't cover all the things what
KIP-288 proposed. I'm seeing some doc explaining poll(0) hasn't been
supported officially, but the hack has been widely used and they can't be
ignored.

Kafka test code itself relies on either deprecated poll(0),
or updateAssignmentMetadataIfNeeded, which seems to be private API only for
testing.
(Btw, I'd try out replacing poll(0) to updateAssignmentMetadataIfNeeded as
avoiding deprecated method - if it works I'll submit a PR.)

I'm feeling that it would be ideal to expose
`updateAssignmentMetadataIfNeeded` to the public API, maybe with renaming
as `waitForAssignment` which was proposed in KIP-288 if it feels too long.

What do you think? If it sounds feasible I'd like to try out contribution
on this. I'm new to contribute Kafka community, so not sure it would
require a new KIP or not.

Thanks,
Jungtaek Lim (HeartSaVioR)

1.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method
2.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior


Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-07 Thread Colin McCabe
On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> Hi Colin,
> "Hmm... I'm not sure I follow.  Users don't have to build their own
> tooling, right?  They can use any of the shell scripts that we've shipped
> in the last few releases.  For example, if any of your users run it, this
> shell script will delete all of the topics from your non-security-enabled
> cluster:
> 
> ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 2>/dev/null
> | xargs -l ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete
> --topic
> 
> They will need to fill in the correct bootstrap servers list, of course,
> not localhost.  This deletion script will work on some pretty old brokers,
> even back to the 0.10 releases.  It seems a little odd to trust your users
> with this power, but not trust them to avoid changing a particular
> configuration key."
> 
> The above will blocked by the server if we set delete.topic.enable to false
> and thats exactly what I am asking for.

Hi Harsha,

I was wondering if someone was going to bring up that configuration :)

it's an interesting complication, but globally disabling topic deletion is not 
very practical for most use-cases. 

In any case, there are plenty of other bad things that users with full 
permissions can do that aren't blocked by any server configuration.  For 
example, they can delete every record in every topic.  I can write a script for 
that too, and there's no server configuration you can set to disable it.  Or I 
could simply create hundreds of thousands of topics, until cluster performance 
becomes unacceptable (this will be even more of a problem if someone configured 
delete.topic.enable as false).  Or publish bad data to every topic, etc. etc.

The point I'm trying to make here is that you can't rely on these kind of 
server-side configurations for security.  At most, they're a way to set up 
certain very simple policies.  But the policies are so simple that they're 
hardly ever useful any more.

For example, if the problem you want to solve is that you want a user to only 
be able to create 50 topics and not delete anyone else's topics, you can solve 
that with a CreateTopicsPolicy that limits the number of topics, and some ACLs. 
 There's no combination of auto.create.topics.enable and delete.topic.enable 
that will help here. 

> 
> "The downside is that if we wanted to check a server side configuration
> before sending the create topics request, the code would be more complex.
> The behavior would also not be consistent with how topic auto-creation is
> handled in Kafka Streams."
> 
> I am not sure why you need to check server side configuration before
> sending create topics request. A user enables producer side config to
> create topics.
> Producer sends a request to the broker and if the broker has
> auto.topic.create.enable to true (default) it will allow creation of
> topics. If it set to false it returns error back to the client.

auto.topic.create.enable has never affected CreateTopicsRequest.  If you submit 
a CreateTopicsRequest and you are authorized, a topic will be created, 
regardless of what the value of auto.topic.create.enable is.  This behavior 
goes back a long way, to about 2016 (Kafka 0.10.1, I think?)

> I don't see how this behavior will be different in Kafka streams. By
> default server allows the topic creation and with this KIP, It will only
> allow creation of topic when both producer and server side are turned on.
> Its exactly the same behavior in KIP-361.

I suggest running a streams job as a test.  It creates the topics it needs 
using AdminClient.  The value of auto.topic.create.enable is not relevant.  
Whether it is true or false, the topics will still be created.

> 
> "In general, it would be nice if we could keep the security and access
> control model simple and not introduce a lot of special cases and
> exceptions.  Kafka has basically converged on using ACLs and
> CreateTopicPolicy classes to control who can create topics and where.
> Adding more knobs seems like a step backwards, especially when the 
> proposed knobs don't work consistently across components, and don't provide 
> true
> security." This is not about access control at all. Shipping sane defaults 
> should 
> be prioritized.

I don't think the default is really the question here.  I think everyone agrees 
that the default for client-side auto-topic creation should be off.

The scenarios that you're describing (such as dealing with a poorly configured 
client that tries to create topics it should not) do seem to be about access 
control.  

> We keep talking about CreateTopicPolicy and yet we don't have default 
> one and asking every user of Kafka implement their own doesn't make sense 
> here.

The point of CreateTopicPolicy is exactly that you can implement your own, 
though.  It's a way of customizing what the policy is in your specific cluster.

I agree that most people don't want to write a custom policy.  But most of 
those people are 

Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-08-07 Thread Arjun Satish
hey Almog, nice work! couple of thoughts (hope I'm not late since you
started the voting thread already):

can you please add some examples to show the changes that you are
proposing. makes me think that for a given decimal number, we will have two
encodings: “asHex” and “asNumber”.

should we call the default config value “HEX” instead of “BINARY”?

Should we call out the fact that JS systems might be susceptible to double
precision round offs with the new numeric format? here are some people
discussing a similar problem
https://github.com/EventStore/EventStore/issues/1541

On Tue, Aug 6, 2019 at 1:40 PM Almog Gavra  wrote:

> Hello Everyone,
>
> Summarizing an in-person discussion with Randall (this is copied from the
> KIP):
>
> The original KIP suggested supporting an additional representation - base10
> encoded text (e.g. `{"asText":"10.2345"}`). This causes issues because it
> is impossible to disambiguate between TEXT and BINARY without an additional
> config - furthermore, this makes the migration from one to the other nearly
> impossible because it would require that all consumers stop consuming and
> producers stop producing and atomically updating the config on all of them
> after deploying the new code, or waiting for the full retention period to
> pass - neither option is viable. The suggestion in the KIP is strictly an
> improvement over the existing behavior, even if it doesn't support all
> combinations.
>
> It seems that since most real-world use cases actually use the numeric
> representation (not string) we can consider this an improvement. With the
> new suggestion, we don't need a deserialization configuration (only a
> serialization option) and the consumers will be able to always
> automatically determine the serialization format.
>
> Based on this, I'll be opening up the simplified version of the KIP to a
> vote.
>
> Almog
>
> On Mon, Jul 29, 2019 at 9:29 AM Almog Gavra  wrote:
>
> > I'm mostly happy with your current suggestion (two configs, one for
> > serialization and one for deserialization) and your implementation
> > suggestion. One thing to note:
> >
> > > We should _always_ be able to deserialize a standard JSON
> > > number as a decimal
> >
> > I was doing some research into decimals and JSON, and I can imagine a
> > compelling reason to require string representations to avoid losing
> > precision and to be certain that whomever is sending the data isn't
> losing
> > precision (e.g. https://stackoverflow.com/a/38357877/2258040).
> >
> > I'm okay with always allowing numerics, but thought it's worth raising
> the
> > thought.
> >
> > On Mon, Jul 29, 2019 at 4:57 AM Andy Coates  wrote:
> >
> >> The way I see it, we need to control two seperate things:
> >>
> >> 1. How do we _deserialize_ a decimal type if we encounter a text node in
> >> the JSON?(We should _always_ be able to deserialize a standard JSON
> >> number as a decimal).
> >> 2. How do we chose how we want decimals to be _serialized_.
> >>
> >> This looks to fits well with your second suggestion of slightly
> different
> >> configs names for serialization vs deserialization.
> >> a, For deserialization we only care about how to handle text nodes: `
> >> deserialization.decimal.*text*.format`, which should only have two valid
> >> values BINARY | TEXT.
> >> b. For serialization we need all three: `serialization.decimal.format`,
> >> which should support all three options: BINARY | TEXT | NUMERIC.
> >>
> >> Implementation wise, I think these should be two separate enums, rather
> >> than one shared enum and throwing an error if the deserializer is set to
> >> NUMERIC.  Mainly as this means the enums reflect the options available,
> >> rather than this being hidden in config checking code.  But that's a
> minor
> >> implementation detail.
> >>
> >> Personally, I'd be tempted to have the BINARY value named something like
> >> `LEGACY` or `LEGACY_BINARY` as a way of encouraging users to move away
> >> from
> >> it.
> >>
> >> It's a real shame that both of these settings require a default of
> BINARY
> >> for backwards compatibility, but I agree that discussions / plans around
> >> switching the defaults should not block this KIP.
> >>
> >> Andy
> >>
> >>
> >> On Thu, 25 Jul 2019 at 18:26, Almog Gavra  wrote:
> >>
> >> > Thanks for the replies Andy and Andrew (2x Andy?)!
> >> >
> >> > > Is the text decimal a base16 encoded number, or is it base16 encoded
> >> > binary
> >> > > form of the number?
> >> >
> >> > The conversion happens as decimal.unscaledValue().toByteArray() and
> then
> >> > the byte array is converted to a hex string, so it's definitely the
> >> binary
> >> > form of the number converted to base16. Whether or not that's the same
> >> as
> >> > the base16 encoded number is a good question (toByteArray returns a
> byte
> >> > array containing a signed, big-endian, two's complement representation
> >> of
> >> > the big integer).
> >> >
> >> > > One suggestion I have is to change the proposed new config