Re: [DISCUSS] KIP-481: SerDe Improvements for Connect Decimal type in JSON

2019-08-08 Thread Arjun Satish
Cool!

Couple of nits:

- In public interfaces, typo: *json.decimal.serialization.fromat*
- In public interfaces, you use the term "HEX" instead of "BASE64".



On Wed, Aug 7, 2019 at 9:51 AM Almog Gavra  wrote:

> EDIT: everywhere I've been using "HEX" I meant to be using "BASE64". I will
> update the KIP to reflect this.
>
> On Wed, Aug 7, 2019 at 9:44 AM Almog Gavra  wrote:
>
> > Thanks for the feedback Arjun! I'm happy changing the default config to
> > HEX instead of BINARY, no strong feelings there.
> >
> > I'll also clarify the example in the KIP to be clearer:
> >
> > - serialize the decimal field "foo" with value "10.2345" with the HEX
> > setting: {"foo": "D3J5"}
> > - serialize the decimal field "foo" with value "10.2345" with the NUMERIC
> > setting: {"foo": 10.2345}
> >
> > With regards to the precision issue, that was my original concern as well
> > (and why I originally suggested a TEXT format). Many JSON deserializers
> > (e.g. Jackson with DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS),
> > however, have the ability to deserialize decimals correctly so I will
> > configure that as the default for Connect's JsonDeserializer. It's
> probably
> > a good idea to call out that using other deserializers must be done with
> > care - I will add that documentation to the serialization config.
> >
> > Note that there would not be an issue on the _serialization_ side of
> > things as Jackson respects BigDecimal.
> >
> > Almog
> >
> > On Tue, Aug 6, 2019 at 11:23 PM Arjun Satish 
> > wrote:
> >
> >> hey Almog, nice work! couple of thoughts (hope I'm not late since you
> >> started the voting thread already):
> >>
> >> can you please add some examples to show the changes that you are
> >> proposing. makes me think that for a given decimal number, we will have
> >> two
> >> encodings: “asHex” and “asNumber”.
> >>
> >> should we call the default config value “HEX” instead of “BINARY”?
> >>
> >> Should we call out the fact that JS systems might be susceptible to
> double
> >> precision round offs with the new numeric format? here are some people
> >> discussing a similar problem
> >> https://github.com/EventStore/EventStore/issues/1541
> >>
> >> On Tue, Aug 6, 2019 at 1:40 PM Almog Gavra  wrote:
> >>
> >> > Hello Everyone,
> >> >
> >> > Summarizing an in-person discussion with Randall (this is copied from
> >> the
> >> > KIP):
> >> >
> >> > The original KIP suggested supporting an additional representation -
> >> base10
> >> > encoded text (e.g. `{"asText":"10.2345"}`). This causes issues because
> >> it
> >> > is impossible to disambiguate between TEXT and BINARY without an
> >> additional
> >> > config - furthermore, this makes the migration from one to the other
> >> nearly
> >> > impossible because it would require that all consumers stop consuming
> >> and
> >> > producers stop producing and atomically updating the config on all of
> >> them
> >> > after deploying the new code, or waiting for the full retention period
> >> to
> >> > pass - neither option is viable. The suggestion in the KIP is strictly
> >> an
> >> > improvement over the existing behavior, even if it doesn't support all
> >> > combinations.
> >> >
> >> > It seems that since most real-world use cases actually use the numeric
> >> > representation (not string) we can consider this an improvement. With
> >> the
> >> > new suggestion, we don't need a deserialization configuration (only a
> >> > serialization option) and the consumers will be able to always
> >> > automatically determine the serialization format.
> >> >
> >> > Based on this, I'll be opening up the simplified version of the KIP
> to a
> >> > vote.
> >> >
> >> > Almog
> >> >
> >> > On Mon, Jul 29, 2019 at 9:29 AM Almog Gavra 
> wrote:
> >> >
> >> > > I'm mostly happy with your current suggestion (two configs, one for
> >> > > serialization and one for deserialization) and your implementation
> >> > > suggestion. One thing to note:
> >> > >
> >> > > > We should _always_ be able to deserialize a standard JSON
> >> > > > number as a decimal
> >> > >
> >> > > I was doing some research into decimals and JSON, and I can imagine
> a
> >> > > compelling reason to require string representations to avoid losing
> >> > > precision and to be certain that whomever is sending the data isn't
> >> > losing
> >> > > precision (e.g. https://stackoverflow.com/a/38357877/2258040).
> >> > >
> >> > > I'm okay with always allowing numerics, but thought it's worth
> raising
> >> > the
> >> > > thought.
> >> > >
> >> > > On Mon, Jul 29, 2019 at 4:57 AM Andy Coates 
> >> wrote:
> >> > >
> >> > >> The way I see it, we need to control two seperate things:
> >> > >>
> >> > >> 1. How do we _deserialize_ a decimal type if we encounter a text
> >> node in
> >> > >> the JSON?(We should _always_ be able to deserialize a standard
> >> JSON
> >> > >> number as a decimal).
> >> > >> 2. How do we chose how we want decimals to be _serialized_.
> >> > >>
> >> > >> This looks to fits well with your second 

[DISCUSS] KIP-505 : Add new public method to only update assignment metadata in consumer

2019-08-08 Thread Jungtaek Lim
Hi devs,

I'd like to initiate discussion around KIP-505, exposing new public method
to only update assignment metadata in consumer.

`poll(0)` has been misused as according to Kafka doc it doesn't guarantee
that it doesn't pull any records, and new method `poll(Duration)` doesn't
have same semantic, so would like to propose new public API which only does
the desired behavior.

KIP page: https://cwiki.apache.org/confluence/x/z5NiBw

Please feel free to suggest any improvements on proposal, as I'm new to
Kafka community and may not catch preferences (like TimeoutException vs
boolean, etc.) on Kafka project.

Thanks in advance!
Jungtaek Lim (HeartSaVioR)


[jira] [Created] (KAFKA-8776) Add new public method to only update assignment metadata in consumer

2019-08-08 Thread Jungtaek Lim (JIRA)
Jungtaek Lim created KAFKA-8776:
---

 Summary: Add new public method to only update assignment metadata 
in consumer
 Key: KAFKA-8776
 URL: https://issues.apache.org/jira/browse/KAFKA-8776
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Jungtaek Lim


In KIP-266, Kafka deprecated `poll(long)` with `poll(Duration)` as former 
method applies no timeout on assignment metadata update. New method applies 
timeout value to do both "update assignment metadata" as well as "poll some 
records", which doesn't cover the case where caller is only interested in 
assignment metadata. `poll(0)` has been used as some kind of hack for such 
purpose (though Kafka document clarifies the behavior is not guaranteed), and 
there's no alternative - `poll(Duration.ZERO)` would bring timeout on updating 
assignment metadata.

This issue proposes a new public method to only trigger updating assignment 
metadata.

Please refer 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer]
 for more details.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-08 Thread Paul Whalen
Matthias,

You did summarize my thinking correctly, thanks for writing it out.  I
think the disconnect on opinion is due to a couple things influenced by my
habits while writing streams code:

1) I don't see state stores that are "individually owned" versus "shared"
as that much different at all, at least from the perspective of the
business logic for the Processor. So it is actually a negative to separate
the connecting of stores, because it appears in the topology wiring that
fewer stores are being used by the Processor than actually are.  A reader
might assume that the Processor doesn't need other state to do its job
which could cause confusion.
2) In practice, my addProcessor() and addStateStore() (or
builder.addStateStore() and stream.process() ) calls are very near each
other anyway, so the shared dependency on StoreBuilder is not a burden;
passing the same object could even bring clarity to the idea that the store
is shared and not individually owned.

Hearing your thoughts though, I think I have imposed a bit too much of my
own style and assumptions on the API, especially with the shared dependency
on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
going to update the KIP since the one +1 vote comes from John who is favor
of relaxing the restriction anyway.

Paul

On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
wrote:

> I am not sure if I full understand, hence, I try to rephrase:
>
> > I can't think of an example that would require both ways, or would
> > even be more readable using both ways.
>
> Example:
>
> There are two processor A and B, and one store S that both need to
> access and one store S_b that only B needs to access:
>
> If we don't allow to mix both approaches, it would be required to write
> the following code:
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // does not add any store
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>
> // DSL example:
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   b.addStateStore() // adds S_b
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>
>
> If we allow to mixes both approaches, the code could be (simplified to):
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
> implicitly
>
> The fact that B has a "private store" could be encapsulated and I don't
> see why this would be bad?
>
> > If you can
> > do both ways, the actual full set of state stores being connected could
> be
> > in wildly different places in the code, which could create confusion.
>
> Ie, I don't see why the second version would be confusing, or why the
> first version would be more readable (I don't argue it's less readable
> either though; I think both are equally readable)?
>
>
>
> Or do you argue that we should allow the following:
>
> > Shared stores can be passed from
> > the outside in an anonymous ProcessorSupplier if desired, making it
> > effectively the same as passing the stateStoreNames var args
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // adds/connects S implicitly
>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   stream1.process(...) // add A and add/connect S implicitly
>   stream2.process(...) // add B and add/connect S and S_b implicitly
>
> For this case, the second implicit adding of S would require to return
> the same `StoreBuilder` instance to make it idempotent what seems hard
> to achieve, because both `ProcessorSuppliers` now have a cross
> dependency to us the same object.
>
> Hence, I don't think this would be a good approach.
>
>
> Also, because we require for a unique store name to always pass the same
> `StoreBuilder` instance, we have actually a good protection against user
> bug that may add two stores with the same name but different builders
> twice.
>
>
> I also do not feel super strong about it, but see some advantages to
> allow the mixed approach, and don't see disadvantages. Would be good to
> get input from others, too.
>
>
>
> -Matthias
>
>
> On 8/7/19 7:29 PM, Paul Whalen wrote:
> > My thinking on restricting the API to enforce only one way of connecting
> > stores would make it more simple to use and end up with more readable
> > code.  I can't think of an example that would require both ways, or would
> > even be more readable using both ways.  

Re: [VOTE] KIP-499 - Unify connection name flag for command line tool

2019-08-08 Thread Harsha Chintalapani
+1  (binding). much needed!!


On Thu, Aug 08, 2019 at 6:43 PM, Gwen Shapira  wrote:

> +1 (binding) THANK YOU. It would be +100 if I could.
>
> On Thu, Aug 8, 2019 at 6:37 PM Mitchell  wrote:
>
> Hello Dev,
> After the discussion I would like to start the vote for KIP-499
>
> The following command line tools will have the `--bootstrap-server`
> command line argument added: kafka-console-producer.sh,
> kafka-consumer-groups.sh, kafka-consumer-perf-test.sh,
> kafka-verifiable-consumer.sh, kafka-verifiable-producer.sh
>
> Thanks,
> -Mitch
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: Ask for contributor access and write permission on wiki

2019-08-08 Thread Jungtaek Lim
Thanks Harsha for granting permission, now I can create subpage on wiki.

-Jungtaek Lim (HeartSaVioR)

On Fri, Aug 9, 2019 at 11:22 AM Harsha Chintalapani  wrote:

> Hi Jungtaek,
>  Gave you permissions on wiki. Please check.
> Thansk,
> Harsha
>
>
> On Thu, Aug 08, 2019 at 7:03 PM, Jungtaek Lim  wrote:
>
> > Hi devs,
> >
> > I'd like to give a shot to make first contribution on Kafka community, as
> > I initiated thread on needs a new public API for metadata update only
> [1].
> >
> > Could you please grant me contributor in JIRA as well as write permission
> > on wiki page?
> >
> > Thanks in advance!
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1,
> > https://lists.apache.org/thread.html/
> > 017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.
> > apache.org%3E
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Ask for contributor access and write permission on wiki

2019-08-08 Thread Harsha Chintalapani
Hi Jungtaek,
 Gave you permissions on wiki. Please check.
Thansk,
Harsha


On Thu, Aug 08, 2019 at 7:03 PM, Jungtaek Lim  wrote:

> Hi devs,
>
> I'd like to give a shot to make first contribution on Kafka community, as
> I initiated thread on needs a new public API for metadata update only [1].
>
> Could you please grant me contributor in JIRA as well as write permission
> on wiki page?
>
> Thanks in advance!
> Jungtaek Lim (HeartSaVioR)
>
> 1,
> https://lists.apache.org/thread.html/
> 017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.
> apache.org%3E
>


Build failed in Jenkins: kafka-trunk-jdk11 #740

2019-08-08 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and

--
[...truncated 2.59 MB...]
org.apache.kafka.trogdor.agent.AgentTest > testAgentProgrammaticShutdown STARTED

org.apache.kafka.trogdor.agent.AgentTest > testAgentProgrammaticShutdown PASSED

org.apache.kafka.trogdor.agent.AgentTest > testDestroyWorkers STARTED

org.apache.kafka.trogdor.agent.AgentTest > testDestroyWorkers PASSED

org.apache.kafka.trogdor.agent.AgentTest > testKiboshFaults STARTED

org.apache.kafka.trogdor.agent.AgentTest > testKiboshFaults PASSED

org.apache.kafka.trogdor.agent.AgentTest > testAgentExecWithTimeout STARTED

org.apache.kafka.trogdor.agent.AgentTest > testAgentExecWithTimeout PASSED

org.apache.kafka.trogdor.agent.AgentTest > testAgentExecWithNormalExit STARTED

org.apache.kafka.trogdor.agent.AgentTest > testAgentExecWithNormalExit PASSED

org.apache.kafka.trogdor.agent.AgentTest > testWorkerCompletions STARTED

org.apache.kafka.trogdor.agent.AgentTest > testWorkerCompletions PASSED

org.apache.kafka.trogdor.agent.AgentTest > testAgentFinishesTasks STARTED

org.apache.kafka.trogdor.agent.AgentTest > testAgentFinishesTasks PASSED

org.apache.kafka.trogdor.basic.BasicPlatformTest > testCreateBasicPlatform 
STARTED

org.apache.kafka.trogdor.basic.BasicPlatformTest > testCreateBasicPlatform 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskRequest STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskRequest PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskDestruction 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskDestruction 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequestMatches 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequestMatches 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testWorkersExitingAtDifferentTimes STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testWorkersExitingAtDifferentTimes PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testAgentFailureAndTaskExpiry STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testAgentFailureAndTaskExpiry PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskDistribution 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskDistribution 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testTaskRequestWithOldStartMsGetsUpdated STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testTaskRequestWithOldStartMsGetsUpdated PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testNetworkPartitionFault STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testNetworkPartitionFault PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testTaskRequestWithFutureStartMsDoesNotGetRun STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > 
testTaskRequestWithFutureStartMsDoesNotGetRun PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskCancellation 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTaskCancellation 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCreateTask STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCreateTask PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequest STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testTasksRequest PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorStatus 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorStatus 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorUptime 
STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorTest > testCoordinatorUptime 
PASSED

org.apache.kafka.trogdor.coordinator.CoordinatorClientTest > 
testPrettyPrintTaskInfo STARTED

org.apache.kafka.trogdor.coordinator.CoordinatorClientTest > 
testPrettyPrintTaskInfo PASSED

org.apache.kafka.trogdor.common.JsonUtilTest > 
testObjectFromCommandLineArgument STARTED

org.apache.kafka.trogdor.common.JsonUtilTest > 
testObjectFromCommandLineArgument PASSED

org.apache.kafka.trogdor.common.JsonUtilTest > testOpenBraceComesFirst STARTED

org.apache.kafka.trogdor.common.JsonUtilTest > testOpenBraceComesFirst PASSED

org.apache.kafka.trogdor.common.TopologyTest > testAgentNodeNames STARTED

org.apache.kafka.trogdor.common.TopologyTest > testAgentNodeNames PASSED

org.apache.kafka.trogdor.common.StringFormatterTest > testDurationString STARTED

org.apache.kafka.trogdor.common.StringFormatterTest > testDurationString PASSED

org.apache.kafka.trogdor.common.StringFormatterTest > testDateString STARTED

org.apache.kafka.trogdor.common.StringFormatterTest > testDateString PASSED


[jira] [Created] (KAFKA-8775) test case pretty happen error the errMsg that zookeeper start failure while execute test case

2019-08-08 Thread zedong.Huang (JIRA)
zedong.Huang created KAFKA-8775:
---

 Summary: test case pretty happen error the errMsg that zookeeper 
start failure while execute test case
 Key: KAFKA-8775
 URL: https://issues.apache.org/jira/browse/KAFKA-8775
 Project: Kafka
  Issue Type: Test
  Components: unit tests
Affects Versions: 2.2.0
 Environment: macOS

docker

kafka-2.2.0
Reporter: zedong.Huang


Obviously, I discover there is error that the kafka-run-class script not found 
java command, and the $JAVA_HOME is empty, in other words, the $PATH 
variables's value does not have jdk's execute path; 

In order to solve this problems, I step int two approachs

  One appro: transfer param while docker run  using -e, but it's failure

  Two appro: Using CMD , and it's does's work

I'm pretty confused, thks for help me!

macOS

docker

kafka-2.2.0



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Ask for contributor access and write permission on wiki

2019-08-08 Thread Jungtaek Lim
Hi devs,

I'd like to give a shot to make first contribution on Kafka community, as I
initiated thread on needs a new public API for metadata update only [1].

Could you please grant me contributor in JIRA as well as write permission
on wiki page?

Thanks in advance!
Jungtaek Lim (HeartSaVioR)

1,
https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E


Re: Alternative of poll(0) without pulling records

2019-08-08 Thread Jungtaek Lim
Thanks Viktor for guiding me through this!

I would initiate new thread to ask edit permission on wiki. Once I got
permission I'll come up with simple KIP page and initiate discussion thread.

Thanks again,
Jungtaek Lim

On Thu, Aug 8, 2019 at 9:42 PM Viktor Somogyi-Vass 
wrote:

> Hey Jungtaek,
>
> Thanks for your interest, sometimes I also think such an API would be a
> good thing.
> I don't see any strong reasons neither in KIP-288 nor in KIP-266 why such
> an API shouldn't be created, so go ahead with it, although you'll need to
> create a short KIP for this as the KafkaConsumer class considered to be a
> public API.
>
> Best,
> Viktor
>
> On Wed, Aug 7, 2019 at 9:26 AM Jungtaek Lim  wrote:
>
> > If we just wanted to remove deprecation and let both co-exist, that would
> > be also viable, though `poll(0)` is still a hack and it would be ideal to
> > provide official approach to do so.
> >
> > On Wed, Aug 7, 2019 at 4:24 PM Jungtaek Lim  wrote:
> >
> > > Hi devs,
> > >
> > > I'm trying to replace deprecated poll(long) with poll(Duration), and
> > > realized there's no alternative which behaves exactly same as poll(0),
> as
> > > poll(0) has been used as a hack to only update metadata instead of
> > pulling
> > > records. poll(Duration.ZERO) wouldn't behave same since even updating
> > > metadata will be timed-out. So now end users would need to give more
> > > timeout and even pull some records even they're only interested in
> > metadata.
> > >
> > > I looked back some KIPs which brought the change, and "discarded" KIP
> > > (KIP-288 [1]) actually proposed a new API which only pulls metadata.
> > > KIP-266 [2] is picked up instead but it didn't cover all the things
> what
> > > KIP-288 proposed. I'm seeing some doc explaining poll(0) hasn't been
> > > supported officially, but the hack has been widely used and they can't
> be
> > > ignored.
> > >
> > > Kafka test code itself relies on either deprecated poll(0),
> > > or updateAssignmentMetadataIfNeeded, which seems to be private API only
> > for
> > > testing.
> > > (Btw, I'd try out replacing poll(0) to updateAssignmentMetadataIfNeeded
> > as
> > > avoiding deprecated method - if it works I'll submit a PR.)
> > >
> > > I'm feeling that it would be ideal to expose
> > > `updateAssignmentMetadataIfNeeded` to the public API, maybe with
> renaming
> > > as `waitForAssignment` which was proposed in KIP-288 if it feels too
> > long.
> > >
> > > What do you think? If it sounds feasible I'd like to try out
> contribution
> > > on this. I'm new to contribute Kafka community, so not sure it would
> > > require a new KIP or not.
> > >
> > > Thanks,
> > > Jungtaek Lim (HeartSaVioR)
> > >
> > > 1.
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method
> > > 2.
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior
> > >
> > >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: [VOTE] KIP-499 - Unify connection name flag for command line tool

2019-08-08 Thread Gwen Shapira
+1 (binding) THANK YOU. It would be +100 if I could.

On Thu, Aug 8, 2019 at 6:37 PM Mitchell  wrote:
>
> Hello Dev,
> After the discussion I would like to start the vote for KIP-499
>
>
> The following command line tools will have the `--bootstrap-server`
> command line argument added: kafka-console-producer.sh,
> kafka-consumer-groups.sh, kafka-consumer-perf-test.sh,
> kafka-verifiable-consumer.sh, kafka-verifiable-producer.sh
>
> Thanks,
> -Mitch



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


[VOTE] KIP-499 - Unify connection name flag for command line tool

2019-08-08 Thread Mitchell
Hello Dev,
After the discussion I would like to start the vote for KIP-499


The following command line tools will have the `--bootstrap-server`
command line argument added: kafka-console-producer.sh,
kafka-consumer-groups.sh, kafka-consumer-perf-test.sh,
kafka-verifiable-consumer.sh, kafka-verifiable-producer.sh

Thanks,
-Mitch


Build failed in Jenkins: kafka-trunk-jdk8 #3837

2019-08-08 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and

--
[...truncated 2.58 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

[jira] [Created] (KAFKA-8774) Connect REST API exposes plaintext secrets in tasks endpoint

2019-08-08 Thread Oleksandr Diachenko (JIRA)
Oleksandr Diachenko created KAFKA-8774:
--

 Summary: Connect REST API exposes plaintext secrets in tasks 
endpoint
 Key: KAFKA-8774
 URL: https://issues.apache.org/jira/browse/KAFKA-8774
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Oleksandr Diachenko
Assignee: Oleksandr Diachenko


I have configured a Connector to use externalized secrets, and the following 
endpoint returns secrets in the externalized form: 
{code:java}
curl localhost:8083/connectors/foobar|jq
{code}
{code:java}
{
"name": "foobar",
"config": {

"connector.class": "io.confluent.connect.s3.S3SinkConnector",
...
"consumer.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"${file:/some/secret/path/secrets.properties:kafka.api.key}\" 
password=\"${file:/some/secret/path/secrets.properties:kafka.api.secret}\";",
"admin.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"${file:/some/secret/path/secrets.properties:kafka.api.key}\" 
password=\"${file:/some/secret/path/secrets.properties:kafka.api.secret}\";",
"consumer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"${file:/some/secret/path/secrets.properties:kafka.api.key}\" 
password=\"${file:/some/secret/path/secrets.properties:kafka.api.secret}\";",
"producer.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"${file:/some/secret/path/secrets.properties:kafka.api.key}\" 
password=\"${file:/some/secret/path/secrets.properties:kafka.api.secret}\";",
"producer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"${file:/some/secret/path/secrets.properties:kafka.api.key}\" 
password=\"${file:/some/secret/path/secrets.properties:kafka.api.secret}\";",
...
},
"tasks": [

{ "connector": "foobar", "task": 0 }

],
"type": "sink"
}{code}
But another endpoint returns secrets in plain text:
{code:java}
curl localhost:8083/connectors/foobar/tasks|jq
{code}
{code:java}
[
  {
"id": {
  "connector": "lcc-kgkpm",
  "task": 0
},
"config": {
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  ...
  "errors.log.include.messages": "true",
  "flush.size": "1000",
  "consumer.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"OOPS\" password=\"SURPRISE\";",
  "admin.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"OOPS\" password=\"SURPRISE\";",
  "consumer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"OOPS\" password=\"SURPRISE\";",
  "producer.override.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"OOPS\" password=\"SURPRISE\";",
  "producer.sasl.jaas.config": 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"OOPS\" password=\"SURPRISE\";",
  ...
}
  }
]
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
I think I agree with you Sophie. My gut feeling is that 1) it should not be
the major concern in assignor's algorithm for standby tasks not catching
up, but rather be tackled in different modules, and 2) a lot of
optimization can be down at the stream thread itself, like dedicated
threading and larger batching, or even complicated scheduling mechanisms
between running, restoring and standby tasks. In anyways, I think we can
take this out of the scope of KIP-441 for now.


Guozhang


On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman 
wrote:

> > we may have other ways to not starving the standby tasks, for example, by
> > using dedicate threads for standby tasks or even consider having
> *higher> priority for standby than active* so that we always try to caught
> up standby
> > first, then process active
>
> This is an interesting idea, but seems likely to get in the way of the
> original idea of this KIP
> -- if we always process standby tasks first, then if we are assigned a new
> standby task we
> will have to wait for it to catch up completely before processing any
> active tasks! That's
> even worse than the situation this KIP is trying to help with, since a new
> standby task has to
> restore from 0 (whereas an active task at least can take over from wherever
> the standby was).
>
> During restoration -- while there exist any restoring tasks -- I think it's
> reasonable to de-prioritize the
> standby tasks and just process restoring and active tasks so both can make
> progress. But we should
> let them catch up afterwards somehow -- maybe we can apply some kind of
> heuristic, like "if we haven't
> processed standbys for X iterations, or Y milliseconds, do so now."
>
> Actually, it might even be beneficial to avoid processing standbys a record
> or two at a time and instead
> wait for a large enough batch to build up for the RocksDB bulk-loading
> benefits.
>
> I think the "use dedicated threads for standby" is the more promising end
> goal, especially since
> if we split restoration into "restoring tasks" then active and standbys
> share almost nothing. But
> that seems like follow-up work to the current KIP :)
>
> On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman 
> wrote:
>
> > Stateful tasks with logging disabled seem to be an interesting edge case.
> > On the one hand,
> > for balancing purposes they should be considered stateful since as
> > Guozhang pointed out
> > they are still "heavy" in IO costs. But for "catching up" purposes, ie
> > when allocating standby
> > tasks that will become active tasks, they should be considered stateless
> > as there is so
> > meaningful sense of their lag. We should never allocate standby tasks for
> > them during the
> > first rebalance, but should ensure they are evenly distributed across
> > instances. Maybe we
> > should split these into a third category -- after we assign all stateful
> > tasks with logging, we
> > then distribute the set of logging-disabled stateful tasks to improve
> > balance, before lastly
> > distributing stateless tasks?
> >
> > This actually leads into what I was just thinking, which is that we
> really
> > should distinguish the
> > "catch-up" standbys from normal standbys as well as distinguishing
> > actively processing tasks
> > from active tasks that are still in the restore phase. It's somewhat
> > awkward that today, some
> > active tasks just start processing immediately while others behave more
> > like standby than active
> > tasks for some time, before switching to real active. They first use the
> > restoreConsumer, then
> > later only the "normal" consumer.
> >
> > However, this restore period is still distinct from normal standbys in a
> > lot of ways -- the code path
> > for restoring is different than for updating standbys, for example in how
> > long we block on #poll.
> > So in addition to giving them their own name -- let's go with restoring
> > task for now -- they really
> > do seem to deserve being their own distinct task. We can optimize them
> for
> > efficient conversion
> > to active tasks since we know that's what they will be.
> >
> > This resolves some of the awkwardness of dealing with the special case
> > mentioned above: we
> > find a balanced assignment of stateful and stateless tasks, and create
> > restoring tasks as needed.
> > If logging is disabled, no restoring task is created.
> >
> >
> > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang  wrote:
> >
> >> Regarding 3) above: I think for active task they should still be
> >> considered
> >> stateful since the processor would still pay IO cost accessing the
> store,
> >> but they would not have standby tasks?
> >>
> >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna 
> wrote:
> >>
> >> > Hi,
> >> >
> >> > Thank you for the KIP!
> >> >
> >> > Some questions/comments:
> >> >
> >> > 1. I am wondering if the "stand-by" tasks that catch up state before
> >> > the active task is switched deserve its own name in this KIP and maybe
> >> > in the code. We 

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Sophie Blee-Goldman
> we may have other ways to not starving the standby tasks, for example, by
> using dedicate threads for standby tasks or even consider having
*higher> priority for standby than active* so that we always try to caught
up standby
> first, then process active

This is an interesting idea, but seems likely to get in the way of the
original idea of this KIP
-- if we always process standby tasks first, then if we are assigned a new
standby task we
will have to wait for it to catch up completely before processing any
active tasks! That's
even worse than the situation this KIP is trying to help with, since a new
standby task has to
restore from 0 (whereas an active task at least can take over from wherever
the standby was).

During restoration -- while there exist any restoring tasks -- I think it's
reasonable to de-prioritize the
standby tasks and just process restoring and active tasks so both can make
progress. But we should
let them catch up afterwards somehow -- maybe we can apply some kind of
heuristic, like "if we haven't
processed standbys for X iterations, or Y milliseconds, do so now."

Actually, it might even be beneficial to avoid processing standbys a record
or two at a time and instead
wait for a large enough batch to build up for the RocksDB bulk-loading
benefits.

I think the "use dedicated threads for standby" is the more promising end
goal, especially since
if we split restoration into "restoring tasks" then active and standbys
share almost nothing. But
that seems like follow-up work to the current KIP :)

On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman 
wrote:

> Stateful tasks with logging disabled seem to be an interesting edge case.
> On the one hand,
> for balancing purposes they should be considered stateful since as
> Guozhang pointed out
> they are still "heavy" in IO costs. But for "catching up" purposes, ie
> when allocating standby
> tasks that will become active tasks, they should be considered stateless
> as there is so
> meaningful sense of their lag. We should never allocate standby tasks for
> them during the
> first rebalance, but should ensure they are evenly distributed across
> instances. Maybe we
> should split these into a third category -- after we assign all stateful
> tasks with logging, we
> then distribute the set of logging-disabled stateful tasks to improve
> balance, before lastly
> distributing stateless tasks?
>
> This actually leads into what I was just thinking, which is that we really
> should distinguish the
> "catch-up" standbys from normal standbys as well as distinguishing
> actively processing tasks
> from active tasks that are still in the restore phase. It's somewhat
> awkward that today, some
> active tasks just start processing immediately while others behave more
> like standby than active
> tasks for some time, before switching to real active. They first use the
> restoreConsumer, then
> later only the "normal" consumer.
>
> However, this restore period is still distinct from normal standbys in a
> lot of ways -- the code path
> for restoring is different than for updating standbys, for example in how
> long we block on #poll.
> So in addition to giving them their own name -- let's go with restoring
> task for now -- they really
> do seem to deserve being their own distinct task. We can optimize them for
> efficient conversion
> to active tasks since we know that's what they will be.
>
> This resolves some of the awkwardness of dealing with the special case
> mentioned above: we
> find a balanced assignment of stateful and stateless tasks, and create
> restoring tasks as needed.
> If logging is disabled, no restoring task is created.
>
>
> On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang  wrote:
>
>> Regarding 3) above: I think for active task they should still be
>> considered
>> stateful since the processor would still pay IO cost accessing the store,
>> but they would not have standby tasks?
>>
>> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna  wrote:
>>
>> > Hi,
>> >
>> > Thank you for the KIP!
>> >
>> > Some questions/comments:
>> >
>> > 1. I am wondering if the "stand-by" tasks that catch up state before
>> > the active task is switched deserve its own name in this KIP and maybe
>> > in the code. We have already stated that they are not true stand-by
>> > tasks, they are not configured through `num.standby.replicas`, and
>> > maybe they have also other properties that distinguish them from true
>> > stand-by tasks of which we are not aware yet. For example, they may be
>> > prioritized differently than other tasks. Furthermore, the name
>> > "stand-by" does not really fit with the planned functionality of those
>> > tasks. In the following, I will call them false stand-by tasks.
>> >
>> > 2. Did you consider to trigger the probing rebalances not at regular
>> > time intervals but when the false stand-by tasks reach an acceptable
>> > lag? If you did consider, could you add a paragraph why you rejected
>> > this idea to the "Rejected Alternatives" 

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Sophie Blee-Goldman
Stateful tasks with logging disabled seem to be an interesting edge case.
On the one hand,
for balancing purposes they should be considered stateful since as Guozhang
pointed out
they are still "heavy" in IO costs. But for "catching up" purposes, ie when
allocating standby
tasks that will become active tasks, they should be considered stateless as
there is so
meaningful sense of their lag. We should never allocate standby tasks for
them during the
first rebalance, but should ensure they are evenly distributed across
instances. Maybe we
should split these into a third category -- after we assign all stateful
tasks with logging, we
then distribute the set of logging-disabled stateful tasks to improve
balance, before lastly
distributing stateless tasks?

This actually leads into what I was just thinking, which is that we really
should distinguish the
"catch-up" standbys from normal standbys as well as distinguishing actively
processing tasks
from active tasks that are still in the restore phase. It's somewhat
awkward that today, some
active tasks just start processing immediately while others behave more
like standby than active
tasks for some time, before switching to real active. They first use the
restoreConsumer, then
later only the "normal" consumer.

However, this restore period is still distinct from normal standbys in a
lot of ways -- the code path
for restoring is different than for updating standbys, for example in how
long we block on #poll.
So in addition to giving them their own name -- let's go with restoring
task for now -- they really
do seem to deserve being their own distinct task. We can optimize them for
efficient conversion
to active tasks since we know that's what they will be.

This resolves some of the awkwardness of dealing with the special case
mentioned above: we
find a balanced assignment of stateful and stateless tasks, and create
restoring tasks as needed.
If logging is disabled, no restoring task is created.


On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang  wrote:

> Regarding 3) above: I think for active task they should still be considered
> stateful since the processor would still pay IO cost accessing the store,
> but they would not have standby tasks?
>
> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > Thank you for the KIP!
> >
> > Some questions/comments:
> >
> > 1. I am wondering if the "stand-by" tasks that catch up state before
> > the active task is switched deserve its own name in this KIP and maybe
> > in the code. We have already stated that they are not true stand-by
> > tasks, they are not configured through `num.standby.replicas`, and
> > maybe they have also other properties that distinguish them from true
> > stand-by tasks of which we are not aware yet. For example, they may be
> > prioritized differently than other tasks. Furthermore, the name
> > "stand-by" does not really fit with the planned functionality of those
> > tasks. In the following, I will call them false stand-by tasks.
> >
> > 2. Did you consider to trigger the probing rebalances not at regular
> > time intervals but when the false stand-by tasks reach an acceptable
> > lag? If you did consider, could you add a paragraph why you rejected
> > this idea to the "Rejected Alternatives" section.
> >
> > 3. Are tasks that solely contain stores with disabled logging
> > classified as stateful or stateless in the algorithm? I would guess
> > stateless, although if possible they should be assigned to the same
> > instance they had run before the rebalance. As far as I can see this
> > special case is not handled in the algorithm.
> >
> > Best,
> > Bruno
> >
> >
> >
> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang  wrote:
> > >
> > > 1. Sounds good, just wanted to clarify; and it may worth documenting it
> > so
> > > that users would not be surprised when monitoring their footprint.
> > >
> > > 2. Hmm I see... I think the trade-off can be described as "how much
> > > imbalance would bother you to be willing to pay another rebalance,
> along
> > > with potentially more restoration lag", and the current definition of
> > > rebalance_factor can be considered as a rough measurement of that
> > > imbalance. Of course one can argue that a finer grained measurement
> could
> > > be "resource footprint" like CPU / storage of each instance like we
> have
> > in
> > > Kafka broker auto balancing tools, but I'd prefer not doing that as
> part
> > of
> > > the library but more as an operational tool in the future. On the other
> > > hand, I've seen stateful and stateless tasks having very different
> load,
> > > and sometimes the only bottleneck of a Streams app is just one stateful
> > > sub-topology and whoever gets tasks of that sub-topology become hotspot
> > > (and that's why our algorithm tries to balance per sub-topology as
> well),
> > > so maybe we can just consider stateful tasks when calculating this
> factor
> > > as a very brute force heuristic?
> > >
> > > 3.a. Thinking about 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Hi Harsha

Let me try to write samples and will let you know.

Thanks
Maulin

On Thu, Aug 8, 2019 at 4:00 PM Harsha Ch  wrote:

> Hi Maulin,
>  With java security providers can be as custom you would like it to
> be. If you only want to to implement a custom way of loading the
> keystore and truststore and not implement any protocol/encryption handling
> you can leave them empty and no need to implement.
> Have you looked into the links I pasted before?
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
>
> Can you please tell me which methods are too complex in above to implement
> or unnecessary? You are changing anything in SSL/TLS implementations
> provided by
>
> All of the implementations delegating the checks to the default
> implementation anyway.
> Spire agent is an example, its nothing but a GRPC server listening on a
> unix domain socket . Above code is making a RPC call to the local daemon to
> get the certificate and keys. The mechanics are pretty much same as what
> you are asking for.
>
> Thanks,
> Harsha
>
> On Thu, Aug 8, 2019 at 3:47 PM Maulin Vasavada 
> wrote:
>
> > Imagine a scenario like - We know we have a custom KMS and as a Kafka
> owner
> > we want to comply to using that KMS source to load keys/certs. As a Kafka
> > owner we know how to integrate with KMS but doesn't necessarily have to
> > know anything about cipher suites, algorithms, and SSL/TLS
> implementation.
> > Going the Provider way requires to know lot more than we should, isn't
> it?
> > Not that we would have concern/shy-away knowing those details - but if we
> > don't have to - why should we?
> >
> > On Thu, Aug 8, 2019 at 3:23 PM Maulin Vasavada <
> maulin.vasav...@gmail.com>
> > wrote:
> >
> > > Hi Harsha
> > >
> > > We don't have spire (or similar) agents and we do not have keys/certs
> > > locally on any brokers. To elaborate more on my previous email,
> > >
> > > I agree that Java security Providers are used in much broader sense -
> to
> > > have a particular implementation of an algorithm, use specific cipher
> > > suites for SSL , OR  in our current team's case have a particular way
> to
> > > leverage pre-generated SSL sessions. However, the scope of our KIP
> (486)
> > is
> > > much restricted than that. We merely intend to provide a custom
> > > keystore/truststore for our SSL connections and not really worry about
> > > underlying specific SSL/TLS implementation.  This simplifies it a lot
> for
> > > us to keep the concerns separate and clear.
> > >
> > > I feel our approach is more complimentary such that it allows for using
> > > keystores of choice while retaining the flexibility to use any
> > > underlying/available Provider for actually making the SSL call.
> > >
> > > We agree with KIP-492's approach based on Providers (and Java's
> > > recommendation), but also strongly believe that our approach can
> > compliment
> > > it very effectively for reasons explained above.
> > >
> > > Thanks
> > > Maulin
> > >
> > > On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani 
> > > wrote:
> > >
> > >> Hi Maulin,
> > >>
> > >> On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
> > >> maulin.vasav...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Harsha
> > >> >
> > >> > The reason we rejected the SslProvider route is that - we only
> needed
> > a
> > >> > custom way to load keys/certs. Not touch any policy that existing
> > >> Providers
> > >> > govern like SunJSSE Provider.
> > >> >
> > >>
> > >> We have exactly the same requirements to load certs and keys through
> > spire
> > >> agent. We used security.provider to do that exactly. I am not sure why
> > you
> > >> would be modifying any policies provided by default SunJSSE provider.
> > Can
> > >> you give me an example of having custom provider that will override an
> > >> existing policy in  SunJSSE provider.
> > >>
> > >> As pointed out earlier, this kip
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
> > >> allows
> > >> you to  load security.provider through config.
> > >> Take a look at the examples I gave before
> > >>
> > >>
> >
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
> > >> It registers KeyManagerFactory and TrustManagerFactory and Keystore
> > >> algorithm.
> > >> Implement your custom way of loading Keystore in here
> > >>
> > >>
> >
> 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Harsha Ch
Hi Maulin,
 With java security providers can be as custom you would like it to
be. If you only want to to implement a custom way of loading the
keystore and truststore and not implement any protocol/encryption handling
you can leave them empty and no need to implement.
Have you looked into the links I pasted before?
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java

Can you please tell me which methods are too complex in above to implement
or unnecessary? You are changing anything in SSL/TLS implementations
provided by

All of the implementations delegating the checks to the default
implementation anyway.
Spire agent is an example, its nothing but a GRPC server listening on a
unix domain socket . Above code is making a RPC call to the local daemon to
get the certificate and keys. The mechanics are pretty much same as what
you are asking for.

Thanks,
Harsha

On Thu, Aug 8, 2019 at 3:47 PM Maulin Vasavada 
wrote:

> Imagine a scenario like - We know we have a custom KMS and as a Kafka owner
> we want to comply to using that KMS source to load keys/certs. As a Kafka
> owner we know how to integrate with KMS but doesn't necessarily have to
> know anything about cipher suites, algorithms, and SSL/TLS implementation.
> Going the Provider way requires to know lot more than we should, isn't it?
> Not that we would have concern/shy-away knowing those details - but if we
> don't have to - why should we?
>
> On Thu, Aug 8, 2019 at 3:23 PM Maulin Vasavada 
> wrote:
>
> > Hi Harsha
> >
> > We don't have spire (or similar) agents and we do not have keys/certs
> > locally on any brokers. To elaborate more on my previous email,
> >
> > I agree that Java security Providers are used in much broader sense - to
> > have a particular implementation of an algorithm, use specific cipher
> > suites for SSL , OR  in our current team's case have a particular way to
> > leverage pre-generated SSL sessions. However, the scope of our KIP (486)
> is
> > much restricted than that. We merely intend to provide a custom
> > keystore/truststore for our SSL connections and not really worry about
> > underlying specific SSL/TLS implementation.  This simplifies it a lot for
> > us to keep the concerns separate and clear.
> >
> > I feel our approach is more complimentary such that it allows for using
> > keystores of choice while retaining the flexibility to use any
> > underlying/available Provider for actually making the SSL call.
> >
> > We agree with KIP-492's approach based on Providers (and Java's
> > recommendation), but also strongly believe that our approach can
> compliment
> > it very effectively for reasons explained above.
> >
> > Thanks
> > Maulin
> >
> > On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani 
> > wrote:
> >
> >> Hi Maulin,
> >>
> >> On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
> >> maulin.vasav...@gmail.com>
> >> wrote:
> >>
> >> > Hi Harsha
> >> >
> >> > The reason we rejected the SslProvider route is that - we only needed
> a
> >> > custom way to load keys/certs. Not touch any policy that existing
> >> Providers
> >> > govern like SunJSSE Provider.
> >> >
> >>
> >> We have exactly the same requirements to load certs and keys through
> spire
> >> agent. We used security.provider to do that exactly. I am not sure why
> you
> >> would be modifying any policies provided by default SunJSSE provider.
> Can
> >> you give me an example of having custom provider that will override an
> >> existing policy in  SunJSSE provider.
> >>
> >> As pointed out earlier, this kip
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
> >> allows
> >> you to  load security.provider through config.
> >> Take a look at the examples I gave before
> >>
> >>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
> >> It registers KeyManagerFactory and TrustManagerFactory and Keystore
> >> algorithm.
> >> Implement your custom way of loading Keystore in here
> >>
> >>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
> >>
> >> and Trust manager like here
> >>
> >>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
> >>
> >> In your Kafka client  you can set the security.provider to your custom
> >> implementation and with this fix
> >> https://issues.apache.org/jira/browse/KAFKA-8191 you can set
> 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Hi Colin,

To your point - "Also, it seems like most people who want a custom
truststore / keystore will also want a custom SSL provider," I don't think
so. Take our own example. We have a fairly large Kafka eco-system (500B+
messages a day flowing through with poly-glot client-base) with strict
InfoSec requirements but we still do not need Custom SSL Provider for Kafka.

Thanks
Maulin

On Thu, Aug 8, 2019 at 3:47 PM Maulin Vasavada 
wrote:

> Imagine a scenario like - We know we have a custom KMS and as a Kafka
> owner we want to comply to using that KMS source to load keys/certs. As a
> Kafka owner we know how to integrate with KMS but doesn't necessarily have
> to know anything about cipher suites, algorithms, and SSL/TLS
> implementation. Going the Provider way requires to know lot more than we
> should, isn't it? Not that we would have concern/shy-away knowing those
> details - but if we don't have to - why should we?
>
> On Thu, Aug 8, 2019 at 3:23 PM Maulin Vasavada 
> wrote:
>
>> Hi Harsha
>>
>> We don't have spire (or similar) agents and we do not have keys/certs
>> locally on any brokers. To elaborate more on my previous email,
>>
>> I agree that Java security Providers are used in much broader sense - to
>> have a particular implementation of an algorithm, use specific cipher
>> suites for SSL , OR  in our current team's case have a particular way to
>> leverage pre-generated SSL sessions. However, the scope of our KIP (486) is
>> much restricted than that. We merely intend to provide a custom
>> keystore/truststore for our SSL connections and not really worry about
>> underlying specific SSL/TLS implementation.  This simplifies it a lot for
>> us to keep the concerns separate and clear.
>>
>> I feel our approach is more complimentary such that it allows for using
>> keystores of choice while retaining the flexibility to use any
>> underlying/available Provider for actually making the SSL call.
>>
>> We agree with KIP-492's approach based on Providers (and Java's
>> recommendation), but also strongly believe that our approach can compliment
>> it very effectively for reasons explained above.
>>
>> Thanks
>> Maulin
>>
>> On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani 
>> wrote:
>>
>>> Hi Maulin,
>>>
>>> On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
>>> maulin.vasav...@gmail.com>
>>> wrote:
>>>
>>> > Hi Harsha
>>> >
>>> > The reason we rejected the SslProvider route is that - we only needed a
>>> > custom way to load keys/certs. Not touch any policy that existing
>>> Providers
>>> > govern like SunJSSE Provider.
>>> >
>>>
>>> We have exactly the same requirements to load certs and keys through
>>> spire
>>> agent. We used security.provider to do that exactly. I am not sure why
>>> you
>>> would be modifying any policies provided by default SunJSSE provider.
>>> Can
>>> you give me an example of having custom provider that will override an
>>> existing policy in  SunJSSE provider.
>>>
>>> As pointed out earlier, this kip
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
>>> allows
>>> you to  load security.provider through config.
>>> Take a look at the examples I gave before
>>>
>>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
>>> It registers KeyManagerFactory and TrustManagerFactory and Keystore
>>> algorithm.
>>> Implement your custom way of loading Keystore in here
>>>
>>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
>>>
>>> and Trust manager like here
>>>
>>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
>>>
>>> In your Kafka client  you can set the security.provider to your custom
>>> implementation and with this fix
>>> https://issues.apache.org/jira/browse/KAFKA-8191 you can set
>>> keyManagerAlgorigthm and trustManagerAlgorithm configs.
>>>
>>> All of this is in your clients and broker side and do not need to touch
>>> any
>>> policy changes at JVM level. You'll register the providers in the
>>> priority
>>> order and can still have SunJSSE provider and have your custom provider
>>> to
>>> implement the key and trust managers.
>>>
>>>
>>>
>>> The ask here is different than KIP-492. We don't have any need to
>>> > modify/specify the algorithm parameter. Does that make sense?
>>> >
>>>
>>> The ask in KIP is introducing new interfaces where the KIP's
>>> goal/motivation can be achieved through the security.provider and we
>>> worked
>>> on similar goal without touching any Keystore or Truststore interfaces.
>>> My advise is against changing or introducing new interfaces when it can
>>> work through security.provider.
>>>
>>> Thanks,
>>> Harsha
>>>
>>>
>>> Thanks
>>> > Maulin
>>> >
>>> > On Thu, Aug 8, 2019 at 7:48 AM 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Imagine a scenario like - We know we have a custom KMS and as a Kafka owner
we want to comply to using that KMS source to load keys/certs. As a Kafka
owner we know how to integrate with KMS but doesn't necessarily have to
know anything about cipher suites, algorithms, and SSL/TLS implementation.
Going the Provider way requires to know lot more than we should, isn't it?
Not that we would have concern/shy-away knowing those details - but if we
don't have to - why should we?

On Thu, Aug 8, 2019 at 3:23 PM Maulin Vasavada 
wrote:

> Hi Harsha
>
> We don't have spire (or similar) agents and we do not have keys/certs
> locally on any brokers. To elaborate more on my previous email,
>
> I agree that Java security Providers are used in much broader sense - to
> have a particular implementation of an algorithm, use specific cipher
> suites for SSL , OR  in our current team's case have a particular way to
> leverage pre-generated SSL sessions. However, the scope of our KIP (486) is
> much restricted than that. We merely intend to provide a custom
> keystore/truststore for our SSL connections and not really worry about
> underlying specific SSL/TLS implementation.  This simplifies it a lot for
> us to keep the concerns separate and clear.
>
> I feel our approach is more complimentary such that it allows for using
> keystores of choice while retaining the flexibility to use any
> underlying/available Provider for actually making the SSL call.
>
> We agree with KIP-492's approach based on Providers (and Java's
> recommendation), but also strongly believe that our approach can compliment
> it very effectively for reasons explained above.
>
> Thanks
> Maulin
>
> On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani 
> wrote:
>
>> Hi Maulin,
>>
>> On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
>> maulin.vasav...@gmail.com>
>> wrote:
>>
>> > Hi Harsha
>> >
>> > The reason we rejected the SslProvider route is that - we only needed a
>> > custom way to load keys/certs. Not touch any policy that existing
>> Providers
>> > govern like SunJSSE Provider.
>> >
>>
>> We have exactly the same requirements to load certs and keys through spire
>> agent. We used security.provider to do that exactly. I am not sure why you
>> would be modifying any policies provided by default SunJSSE provider.  Can
>> you give me an example of having custom provider that will override an
>> existing policy in  SunJSSE provider.
>>
>> As pointed out earlier, this kip
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
>> allows
>> you to  load security.provider through config.
>> Take a look at the examples I gave before
>>
>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
>> It registers KeyManagerFactory and TrustManagerFactory and Keystore
>> algorithm.
>> Implement your custom way of loading Keystore in here
>>
>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
>>
>> and Trust manager like here
>>
>> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
>>
>> In your Kafka client  you can set the security.provider to your custom
>> implementation and with this fix
>> https://issues.apache.org/jira/browse/KAFKA-8191 you can set
>> keyManagerAlgorigthm and trustManagerAlgorithm configs.
>>
>> All of this is in your clients and broker side and do not need to touch
>> any
>> policy changes at JVM level. You'll register the providers in the priority
>> order and can still have SunJSSE provider and have your custom provider to
>> implement the key and trust managers.
>>
>>
>>
>> The ask here is different than KIP-492. We don't have any need to
>> > modify/specify the algorithm parameter. Does that make sense?
>> >
>>
>> The ask in KIP is introducing new interfaces where the KIP's
>> goal/motivation can be achieved through the security.provider and we
>> worked
>> on similar goal without touching any Keystore or Truststore interfaces.
>> My advise is against changing or introducing new interfaces when it can
>> work through security.provider.
>>
>> Thanks,
>> Harsha
>>
>>
>> Thanks
>> > Maulin
>> >
>> > On Thu, Aug 8, 2019 at 7:48 AM Harsha Chintalapani 
>> > wrote:
>> >
>> > In your KIP you added security. provider as rejected alternative and
>> > specified "its not the correct way". Do you mind explaining why its
>> not? I
>> > didn't find any evidence in Java docs to say so. Contrary to your
>> statement
>> > it does say in the java docs
>> > " However, please note that a provider can be used to implement any
>> > security service in Java that uses a pluggable architecture with a
>> choice
>> > of implementations that fit underneath."
>> >
>> > Java Security Providers have been used by other projects 

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
Regarding 3) above: I think for active task they should still be considered
stateful since the processor would still pay IO cost accessing the store,
but they would not have standby tasks?

On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna  wrote:

> Hi,
>
> Thank you for the KIP!
>
> Some questions/comments:
>
> 1. I am wondering if the "stand-by" tasks that catch up state before
> the active task is switched deserve its own name in this KIP and maybe
> in the code. We have already stated that they are not true stand-by
> tasks, they are not configured through `num.standby.replicas`, and
> maybe they have also other properties that distinguish them from true
> stand-by tasks of which we are not aware yet. For example, they may be
> prioritized differently than other tasks. Furthermore, the name
> "stand-by" does not really fit with the planned functionality of those
> tasks. In the following, I will call them false stand-by tasks.
>
> 2. Did you consider to trigger the probing rebalances not at regular
> time intervals but when the false stand-by tasks reach an acceptable
> lag? If you did consider, could you add a paragraph why you rejected
> this idea to the "Rejected Alternatives" section.
>
> 3. Are tasks that solely contain stores with disabled logging
> classified as stateful or stateless in the algorithm? I would guess
> stateless, although if possible they should be assigned to the same
> instance they had run before the rebalance. As far as I can see this
> special case is not handled in the algorithm.
>
> Best,
> Bruno
>
>
>
> On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang  wrote:
> >
> > 1. Sounds good, just wanted to clarify; and it may worth documenting it
> so
> > that users would not be surprised when monitoring their footprint.
> >
> > 2. Hmm I see... I think the trade-off can be described as "how much
> > imbalance would bother you to be willing to pay another rebalance, along
> > with potentially more restoration lag", and the current definition of
> > rebalance_factor can be considered as a rough measurement of that
> > imbalance. Of course one can argue that a finer grained measurement could
> > be "resource footprint" like CPU / storage of each instance like we have
> in
> > Kafka broker auto balancing tools, but I'd prefer not doing that as part
> of
> > the library but more as an operational tool in the future. On the other
> > hand, I've seen stateful and stateless tasks having very different load,
> > and sometimes the only bottleneck of a Streams app is just one stateful
> > sub-topology and whoever gets tasks of that sub-topology become hotspot
> > (and that's why our algorithm tries to balance per sub-topology as well),
> > so maybe we can just consider stateful tasks when calculating this factor
> > as a very brute force heuristic?
> >
> > 3.a. Thinking about this a bit more, maybe it's better not try to tackle
> an
> > unseen enemy just yet, and observe if it really emerges later, and by
> then
> > we may have other ways to not starving the standby tasks, for example, by
> > using dedicate threads for standby tasks or even consider having higher
> > priority for standby than active so that we always try to caught up
> standby
> > first, then process active; and if active's lagging compared to
> > log-end-offset is increasing then we should increase capacity, etc etc.
> >
> > 4. Actually with KIP-429 this may not be the case: we may not call
> > onPartitionsRevoked prior to rebalance any more so would not transit
> state
> > to PARTITIONS_REVOKED, and hence not cause the state of the instance to
> be
> > REBALANCING. In other words, even if a instance is undergoing a rebalance
> > it's state may still be RUNNING and it may still be processing records at
> > the same time.
> >
> >
> > On Wed, Aug 7, 2019 at 12:14 PM John Roesler  wrote:
> >
> > > Hey Guozhang,
> > >
> > > Thanks for the review!
> > >
> > > 1. Yes, even with `num.standby.replicas := 0`, we will still
> temporarily
> > > allocate standby tasks to accomplish a no-downtime task migration.
> > > Although, I'd argue that this doesn't really violate the config, as the
> > > task isn't a true hot standby. As soon as it catches up, we'll
> rebalance
> > > again, that task will become active, and the original instance that
> hosted
> > > the active task will no longer have the task assigned at all. Once the
> > > stateDirCleaner kicks in, we'll free the disk space from it, and
> return to
> > > the steady-state of having just one copy of the task in the cluster.
> > >
> > > We can of course do without this, but I feel the current proposal is
> > > operationally preferable, since it doesn't make configuring
> hot-standbys a
> > > pre-requisite for fast rebalances.
> > >
> > > 2. Yes, I think your interpretation is what we intended. The default
> > > balance_factor would be 1, as it is implicitly today. What this does is
> > > allows operators to trade off less balanced assignments against fewer
> > > rebalances. If you have 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Colin McCabe
Harsha made a good point that you can achieve your goals through KIP-492.  
Security configuration is starting to get pretty complex-- is there a reason 
not to use the existing configurations?

Also, it seems like most people who want a custom truststore / keystore will 
also want a custom SSL provider, so that they can integrate their custom SSL 
infra with more applications than just Kafka.  So I'm not sure how big the 
audience for the proposed new configurations would be.

regards,
Colin


On Thu, Aug 8, 2019, at 15:29, Maulin Vasavada wrote:
> Hi Harsha
> 
> We don't have spire (or similar) agents and we do not have keys/certs
> locally on any brokers. To elaborate more on my previous email,
> 
> I agree that Java security Providers are used in much broader sense - to
> have a particular implementation of an algorithm, use specific cipher
> suites for SSL , OR  in our current team's case have a particular way to
> leverage pre-generated SSL sessions. However, the scope of our KIP (486) is
> much restricted than that. We merely intend to provide a custom
> keystore/truststore for our SSL connections and not really worry about
> underlying specific SSL/TLS implementation.  This simplifies it a lot for
> us to keep the concerns separate and clear.
> 
> I feel our approach is more complimentary such that it allows for using
> keystores of choice while retaining the flexibility to use any
> underlying/available Provider for actually making the SSL call.
> 
> We agree with KIP-492's approach based on Providers (and Java's
> recommendation), but also strongly believe that our approach can compliment
> it very effectively for reasons explained above.
> 
> Thanks
> Maulin
> 
> On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani  wrote:
> 
> > Hi Maulin,
> >
> > On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
> > maulin.vasav...@gmail.com>
> > wrote:
> >
> > > Hi Harsha
> > >
> > > The reason we rejected the SslProvider route is that - we only needed a
> > > custom way to load keys/certs. Not touch any policy that existing
> > Providers
> > > govern like SunJSSE Provider.
> > >
> >
> > We have exactly the same requirements to load certs and keys through spire
> > agent. We used security.provider to do that exactly. I am not sure why you
> > would be modifying any policies provided by default SunJSSE provider.  Can
> > you give me an example of having custom provider that will override an
> > existing policy in  SunJSSE provider.
> >
> > As pointed out earlier, this kip
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
> > allows
> > you to  load security.provider through config.
> > Take a look at the examples I gave before
> >
> > https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
> > It registers KeyManagerFactory and TrustManagerFactory and Keystore
> > algorithm.
> > Implement your custom way of loading Keystore in here
> >
> > https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
> >
> > and Trust manager like here
> >
> > https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
> >
> > In your Kafka client  you can set the security.provider to your custom
> > implementation and with this fix
> > https://issues.apache.org/jira/browse/KAFKA-8191 you can set
> > keyManagerAlgorigthm and trustManagerAlgorithm configs.
> >
> > All of this is in your clients and broker side and do not need to touch any
> > policy changes at JVM level. You'll register the providers in the priority
> > order and can still have SunJSSE provider and have your custom provider to
> > implement the key and trust managers.
> >
> >
> >
> > The ask here is different than KIP-492. We don't have any need to
> > > modify/specify the algorithm parameter. Does that make sense?
> > >
> >
> > The ask in KIP is introducing new interfaces where the KIP's
> > goal/motivation can be achieved through the security.provider and we worked
> > on similar goal without touching any Keystore or Truststore interfaces.
> > My advise is against changing or introducing new interfaces when it can
> > work through security.provider.
> >
> > Thanks,
> > Harsha
> >
> >
> > Thanks
> > > Maulin
> > >
> > > On Thu, Aug 8, 2019 at 7:48 AM Harsha Chintalapani 
> > > wrote:
> > >
> > > In your KIP you added security. provider as rejected alternative and
> > > specified "its not the correct way". Do you mind explaining why its not?
> > I
> > > didn't find any evidence in Java docs to say so. Contrary to your
> > statement
> > > it does say in the java docs
> > > " However, please note that a provider can be used to implement any
> > > security service in Java that uses a pluggable architecture with a choice
> > > of 

Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-08 Thread Don Bosco Durai
Hi Rajini

Thanks for clarifying. This is very helpful...

#1 - On the Ranger side, we should be able to handle multiple requests at the 
same time. I was just not sure how much processing overhead will be there on 
the Broker side to split and then consolidate the results. If it is negligible, 
 then this is the better way. It will make it future proof.
#2 -  I agree, having a single "start" call makes it cleaner. The Authorization 
implementation will only have to do the initialization only once.
#3.1 - Thanks for the clarification. I think it makes sense to have this. The 
term "Mode" might not be explicit enough. Essentially it seems you want the 
Authorizer to know the Intent/Purpose of the authorize call and let the 
Authorizer decide what to log as Audit event. Changing the class/field name or 
giving more documentation will do.
#3.2 - Regarding the option "OPTIONAL". Are you thinking from chaining multiple 
Authorizer? If so,  I am not sure whether the Broker would have enough 
information to make that decision. I feel the Authorizer will be the one who 
would have that knowledge. E.g. in Ranger we have explicit deny, which means no 
matter what, the request should be denied for the user/group or condition. So 
if you are thinking of chaining Authorizers, then responses should have the 
third state, e.g. "DENIED_FINAL", in which case if there is an Authorization 
chain, it will be stop and the request will be denied and if it is just denied, 
then you might fall back to next authorizer. If we don't have chaining of 
Authorizing in mind, then we should reconsider OPTIONAL for now. Or clarify 
under which scenario OPTIONAL will be called.
#3.3 Regarding, FILTER v/s LIST_AUTHORIZED, isn't LIST_AUTHORIZED a special 
case for "FILTER"?
#3.4 KafkaRequestContext. requestType() v/s Action. authorizationMode. I am not 
sure about the overlap or ambiguity. 
#4 - Cool. This is good, it will be less stress on the Authorizer. Ranger 
already supports the "count" concept and also has batching capability to 
aggregate similar requests to reduce the number of audit logs to write. We 
should be able to pass this through.
#5 - Assuming if the object instance is going out of scope, we should be fine. 
Not a super important ask. Ranger is already catching KILL signal for clean up.

Thanks again. These are good enhancements. Appreciate your efforts here.

Bosco



On 8/8/19, 2:03 AM, "Rajini Sivaram"  wrote:

Hi Don,

Thanks for reviewing the KIP.

1. I had this originally as a single Action, but thought it may be useful
to support batched authorize calls as well and keep it consistent with
other methods. Single requests can contain multiple topics. For example a
produce request can contain records for several partitions of different
topics. Broker could potentially authorize these together. For
SimpleAclAuthorizer, batched authorize methods don't provide any
optimisation since lookup is based on resources followed by the matching
logic. But some authorizers may manage ACLs by user principal rather than
resource and may be able to optimize batched requests. I am ok with using
single Action if this is likely to cause issues.
2. If you have two listeners, one for inter-broker traffic and another for
external clients, start method is invoked twice, once for each listener. On
second thought, that may be confusing and a single start() invocation that
provides all listener information and returns multiple futures would be
better. Will update the KIP.
3. A typical example is a consumer subscribing to a regex pattern. We
request all topic metadata from the broker in order to decide whether the
pattern matches, expecting to receive a list of authorised topics. The user
is not asking to subscribe to an unauthorized topic. If there are 1
topics in the cluster and the user has access to 100 of them, at the moment
we log 9900 DENIED log entries at INFO level in SimpleAclAuthorizer. The
proposal is to authorize this request with AuthorizationMode.FILTER, so
that authorizers can log resources that are filtered out at lower level
like DEBUG since this is not an attempt to access unauthorized resources.
Brokers already handle these differently since no authorization error is
returned to the client in these cases. Providing authorization mode to
authorizers enables authorizer implementations to generate better audit
logs.
4. Each request may contain multiple instances of the same authorizable
resource. For example a produce request may contain records for 10
partitions of the same topic. At the moment, we invoke authorize method 10
times. The proposal is to invoke it once with count=10. The count is
provided to authorizer just for audit logging purposes.
5. Authorizer implements Closeable, so you could use close() to flush
audits?

On Thu, Aug 8, 2019 at 7:01 AM Don 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Hi Harsha

We don't have spire (or similar) agents and we do not have keys/certs
locally on any brokers. To elaborate more on my previous email,

I agree that Java security Providers are used in much broader sense - to
have a particular implementation of an algorithm, use specific cipher
suites for SSL , OR  in our current team's case have a particular way to
leverage pre-generated SSL sessions. However, the scope of our KIP (486) is
much restricted than that. We merely intend to provide a custom
keystore/truststore for our SSL connections and not really worry about
underlying specific SSL/TLS implementation.  This simplifies it a lot for
us to keep the concerns separate and clear.

I feel our approach is more complimentary such that it allows for using
keystores of choice while retaining the flexibility to use any
underlying/available Provider for actually making the SSL call.

We agree with KIP-492's approach based on Providers (and Java's
recommendation), but also strongly believe that our approach can compliment
it very effectively for reasons explained above.

Thanks
Maulin

On Thu, Aug 8, 2019 at 3:05 PM Harsha Chintalapani  wrote:

> Hi Maulin,
>
> On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada <
> maulin.vasav...@gmail.com>
> wrote:
>
> > Hi Harsha
> >
> > The reason we rejected the SslProvider route is that - we only needed a
> > custom way to load keys/certs. Not touch any policy that existing
> Providers
> > govern like SunJSSE Provider.
> >
>
> We have exactly the same requirements to load certs and keys through spire
> agent. We used security.provider to do that exactly. I am not sure why you
> would be modifying any policies provided by default SunJSSE provider.  Can
> you give me an example of having custom provider that will override an
> existing policy in  SunJSSE provider.
>
> As pointed out earlier, this kip
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
> allows
> you to  load security.provider through config.
> Take a look at the examples I gave before
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
> It registers KeyManagerFactory and TrustManagerFactory and Keystore
> algorithm.
> Implement your custom way of loading Keystore in here
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java
>
> and Trust manager like here
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java
>
> In your Kafka client  you can set the security.provider to your custom
> implementation and with this fix
> https://issues.apache.org/jira/browse/KAFKA-8191 you can set
> keyManagerAlgorigthm and trustManagerAlgorithm configs.
>
> All of this is in your clients and broker side and do not need to touch any
> policy changes at JVM level. You'll register the providers in the priority
> order and can still have SunJSSE provider and have your custom provider to
> implement the key and trust managers.
>
>
>
> The ask here is different than KIP-492. We don't have any need to
> > modify/specify the algorithm parameter. Does that make sense?
> >
>
> The ask in KIP is introducing new interfaces where the KIP's
> goal/motivation can be achieved through the security.provider and we worked
> on similar goal without touching any Keystore or Truststore interfaces.
> My advise is against changing or introducing new interfaces when it can
> work through security.provider.
>
> Thanks,
> Harsha
>
>
> Thanks
> > Maulin
> >
> > On Thu, Aug 8, 2019 at 7:48 AM Harsha Chintalapani 
> > wrote:
> >
> > In your KIP you added security. provider as rejected alternative and
> > specified "its not the correct way". Do you mind explaining why its not?
> I
> > didn't find any evidence in Java docs to say so. Contrary to your
> statement
> > it does say in the java docs
> > " However, please note that a provider can be used to implement any
> > security service in Java that uses a pluggable architecture with a choice
> > of implementations that fit underneath."
> >
> > Java Security Providers have been used by other projects to provide such
> > integration . I am not sure if you looked into Spiffe project to
> > efficiently distribute certificates but here is an example of Java
> provider
> >
> > https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/
> >
> spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.
> > java which
> > obtains certificates from local daemons.
> > These integrations are being used in Tomcat, Jetty etc.. We are also
> using
> > Security provider to do the same in our Kafka clusters. So unless I see
> > more evidence why security.provider doesn't work for you adding new
> > interfaces while there exists more cleaner way of achieving 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Harsha Chintalapani
Hi Maulin,

On Thu, Aug 08, 2019 at 2:04 PM, Maulin Vasavada 
wrote:

> Hi Harsha
>
> The reason we rejected the SslProvider route is that - we only needed a
> custom way to load keys/certs. Not touch any policy that existing Providers
> govern like SunJSSE Provider.
>

We have exactly the same requirements to load certs and keys through spire
agent. We used security.provider to do that exactly. I am not sure why you
would be modifying any policies provided by default SunJSSE provider.  Can
you give me an example of having custom provider that will override an
existing policy in  SunJSSE provider.

As pointed out earlier, this kip
https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
allows
you to  load security.provider through config.
Take a look at the examples I gave before
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
It registers KeyManagerFactory and TrustManagerFactory and Keystore
algorithm.
Implement your custom way of loading Keystore in here
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeKeyStore.java

and Trust manager like here
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeTrustManager.java

In your Kafka client  you can set the security.provider to your custom
implementation and with this fix
https://issues.apache.org/jira/browse/KAFKA-8191 you can set
keyManagerAlgorigthm and trustManagerAlgorithm configs.

All of this is in your clients and broker side and do not need to touch any
policy changes at JVM level. You'll register the providers in the priority
order and can still have SunJSSE provider and have your custom provider to
implement the key and trust managers.



The ask here is different than KIP-492. We don't have any need to
> modify/specify the algorithm parameter. Does that make sense?
>

The ask in KIP is introducing new interfaces where the KIP's
goal/motivation can be achieved through the security.provider and we worked
on similar goal without touching any Keystore or Truststore interfaces.
My advise is against changing or introducing new interfaces when it can
work through security.provider.

Thanks,
Harsha


Thanks
> Maulin
>
> On Thu, Aug 8, 2019 at 7:48 AM Harsha Chintalapani 
> wrote:
>
> In your KIP you added security. provider as rejected alternative and
> specified "its not the correct way". Do you mind explaining why its not? I
> didn't find any evidence in Java docs to say so. Contrary to your statement
> it does say in the java docs
> " However, please note that a provider can be used to implement any
> security service in Java that uses a pluggable architecture with a choice
> of implementations that fit underneath."
>
> Java Security Providers have been used by other projects to provide such
> integration . I am not sure if you looked into Spiffe project to
> efficiently distribute certificates but here is an example of Java provider
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/
> spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.
> java which
> obtains certificates from local daemons.
> These integrations are being used in Tomcat, Jetty etc.. We are also using
> Security provider to do the same in our Kafka clusters. So unless I see
> more evidence why security.provider doesn't work for you adding new
> interfaces while there exists more cleaner way of achieving the goals of
> this KIP is unnecessary and breaks the well known security interfaces
> provided by Java itself.
>
> Thanks,
> Harsha
>
> On Thu, Aug 08, 2019 at 6:54 AM, Harsha Chintalapani 
> wrote:
>
> Hi Maulin,
> Not sure if you looked at my previous replies. This
>
> changes
>
> are not required as there is already security Provider to do what you are
> proposing. This KIP https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config also
> addresses easy registration of such providers.
>
> Thanks,
> Harsha
>
> On Wed, Aug 07, 2019 at 11:31 PM, Maulin Vasavada  com> wrote:
>
> Bump! Can somebody please review this?
>
> On Tue, Jul 16, 2019 at 1:51 PM Maulin Vasavada <
>
> maulin.vasav...@gmail.com>
>
> wrote:
>
> Bump! Can somebody please review this?
>
>


Re: [DISCUSS] KIP-499 - Unify connection name flag for command line tool

2019-08-08 Thread Colin McCabe
I agree that limiting the scope of the KIP would be good.

The configuration is actually bootstrap.servers with an S, though.

I actually like --bootstrap-servers slightly better than --bootstrap-server, 
although I don't feel that strongly about either. ;)

best,
Colin


On Thu, Aug 8, 2019, at 14:25, Jason Gustafson wrote:
> @Dongjin Thanks, you raise some good points. I think the intent here is to
> try and fix one of the more egregious inconsistencies without increasing
> the scope too much. We tried the big KIP approach with KIP-14 before and I
> don't think we made much progress. I think it's reasonable to do this on a
> case by case basis to avoid a lot of bikeshedding. The name
> `--bootstrap-sever` is the one the project has generally settled on. It is
> consistent with the `bootstrap.server` configuration which is used by all
> the clients.
> 
> @Mitch I'd suggest taking this to a vote.
> 
> -Jason
> 
> On Fri, Aug 2, 2019 at 5:37 AM Dongjin Lee  wrote:
> 
> > Hello Mitchel,
> >
> > Thanks for the KIP. Sure, This inconsistency is really annoying and causing
> > lots of confusions. Here are some comments:
> >
> > First, there is already a Jira issue about this problem, created by Ismael.
> > https://issues.apache.org/jira/browse/KAFKA-8507 I added the link to the
> > KIP.
> >
> > Add to this, it seems like this problem is a little bit more complicated
> > than the first glance - you can see more in the comment I left in the Jira
> > issue 2 weeks ago.
> >
> > *1. Location and parameter parser*
> >
> > Most tools are located in kafka.tools or kafka.admin package in the core
> > module, with jOptSimple as a arguments parser. However,
> > kafka-verifiable-consumer.sh (VerifiableConsumer) and
> > kafka-verifiable-producer.sh (VerifiableProducer) are located in
> > org.apache.kafka.tools package in tools module, with argparse4j as a
> > parameter parser. For this reason, they do not provide standardized
> > 'version' parameter. (see KAFKA-8292).
> >
> > You can find the implementation of jOptSimple parameter parsing and testing
> > from the following:
> >
> > -
> >
> > https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/main/scala/kafka/tools/ProducerPerformance.scala
> > -
> >
> > https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/test/scala/kafka/tools/ProducerPerformanceTest.scala
> >
> > *2. Connection name flag*
> >
> > Currently, there are three co-existing alternatives in the flags:
> >
> > a. broker-list: ConsumerPerformance, ConsoleProducer, etc.
> > b. bootstrap-server: ConfigCommand, etc.
> > c. bootstrap-servers: StreamsResetter.
> >
> > Before progressing this issue, it would be better to agree with which name
> > should be the standard. It seems like most of the community agree to
> > bootstrap-server (b) but, bootstrap-servers (c) deserves consideration - it
> > is consistent with [ProducerConfig,
> > ConsumerConfig].BOOTSTRAP_SERVERS_CONFIG.
> >
> > How do you think?
> >
> > *3. Other consistencies*
> >
> > As you can see in the comment, there are more inconsistency problems in the
> > tools. May I take the tasks except for the tools you are working, as a
> > follow-up KIP?
> >
> > @committers Is this okay?
> >
> > Thanks,
> > Dongjin
> >
> >
> > On Fri, Aug 2, 2019 at 7:15 AM Gwen Shapira  wrote:
> >
> > > +1 for the KIP
> > >
> > > On Tue, Jul 30, 2019 at 5:25 PM Mitchell  wrote:
> > > >
> > > > Hello,
> > > > I have written a proposal to add the command line argument
> > > > `--bootstrap-server` to 5 of the existing command line tools that do
> > not
> > > > currently use `--broker-list` for passing cluster connection
> > information.
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool
> > > >
> > > > Please take a look and let me know what you think.
> > > > Thanks,
> > > > -Mitch
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> > *github:  github.com/dongjinleekr
> > linkedin: kr.linkedin.com/in/dongjinleekr
> > speakerdeck:
> > speakerdeck.com/dongjin
> > *
> >
>


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-08 Thread Colin McCabe
Hi Rajini,

Thanks for the KIP.  This will be a great improvement.

Why not just pass the cluster ID directly to Authorizer#start, rather than 
dealing with the ClusterResourceListener interface?  That seems like it would 
be simpler.  If authorizers don't need that information, they can ignore that 
parameter.

Do we really need to change the configuration key name from 
authorizer.class.name to authorizer.class?  We should be able to use 
introspection to see whether it's a subclass of the old or the new interface, 
right?  This would avoid having two configuration keys that were very similar.

The Authorizer JavaDoc should have a comment about thread safety.  I assume 
that all of the Authorizer methods need to be thread-safe, right?

For CreateAcls and DeleteAcls, it seems like we would be better off just using 
AclBinding objects.  An AclBinding object contains a ResourcePattern and an 
AccessControlEntry.  There is no need to pass them separately.

It seems like we don't need all the different overloads of deleteAcls.  We can 
just pass in an AclBindingFilter, and that can delete whatever is needed.  For 
example, if you want to delete all the ACLs for a specific resource, you could 
pass in an AclBindingFilter with AccessControlEntryFilter.ANY.  It would be 
nice to have deleteAcls return the number of entries that were deleted.

I like the way KafkaRequestContext is split from Action.  That will be helpful.

We need a way of listing all the ACLs, because that will be needed for the 
AdminClient.  The simplest API would be just passing in an AclBindingFilter.  
Listing the ACLs for a specific resource is just a special case, which doesn't 
need its own function.  Since the number of ACLs could be quite large, we 
probably want to return an iterator rather than a full collection.  We can 
eventually make the iterator load results lazily if needed.

AuthorizationMode doesn't really make sense to me.  It seems to be designed to 
allow us to log different kinds of authorization failures differently.  But it 
would be simpler to just split the API into two parts: getting an authorization 
result, and logging it.  The Authorizer#authorize method should return a result 
that we can later pass to an Authorizer#logFailure method.  Then, when code 
wants to try to authorize against X, and then if that fails authorize against 
Y, it can simply only pass the second failure result to Authorizer#logFailure 
to avoid creating extra log messages.  Or we could create a method 
AuthorizationResult#logFaliure().

For requestType, I don't think this should be a string.  The 16-bit API key is 
clearer and more concise.  The request names currently aren't part of the 
stable API, and it would be nice to keep it that way.  To distinguish between 
follower fetches and consumer fetches, we should just have a separate way of 
knowing that the broker made the request.  Maybe something like 
KafkaRequestContext#Originator.  Then we can have Originator#CLIENT and 
Originator#BROKER.

It's hard to understand what Action#count is for.  If we want to log something 
like "failed to authorize deleting 123 partitions", it would be better to just 
let the caller supply some custom text in Authorizer#logFailure.

Thanks again for the KIP.

best,
Colin

On Thu, Aug 8, 2019, at 02:03, Rajini Sivaram wrote:
> Hi Don,
> 
> Thanks for reviewing the KIP.
> 
> 1. I had this originally as a single Action, but thought it may be useful
> to support batched authorize calls as well and keep it consistent with
> other methods. Single requests can contain multiple topics. For example a
> produce request can contain records for several partitions of different
> topics. Broker could potentially authorize these together. For
> SimpleAclAuthorizer, batched authorize methods don't provide any
> optimisation since lookup is based on resources followed by the matching
> logic. But some authorizers may manage ACLs by user principal rather than
> resource and may be able to optimize batched requests. I am ok with using
> single Action if this is likely to cause issues.
> 2. If you have two listeners, one for inter-broker traffic and another for
> external clients, start method is invoked twice, once for each listener. On
> second thought, that may be confusing and a single start() invocation that
> provides all listener information and returns multiple futures would be
> better. Will update the KIP.
> 3. A typical example is a consumer subscribing to a regex pattern. We
> request all topic metadata from the broker in order to decide whether the
> pattern matches, expecting to receive a list of authorised topics. The user
> is not asking to subscribe to an unauthorized topic. If there are 1
> topics in the cluster and the user has access to 100 of them, at the moment
> we log 9900 DENIED log entries at INFO level in SimpleAclAuthorizer. The
> proposal is to authorize this request with AuthorizationMode.FILTER, so
> that authorizers can log resources that 

[jira] [Resolved] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-08-08 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4600.
--
Resolution: Fixed

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.4.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8493) Add PartitionsLost API in RebalanceListener (part 3)

2019-08-08 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8493.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Add PartitionsLost API in RebalanceListener (part 3)
> 
>
> Key: KAFKA-8493
> URL: https://issues.apache.org/jira/browse/KAFKA-8493
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.4.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-499 - Unify connection name flag for command line tool

2019-08-08 Thread Jason Gustafson
@Dongjin Thanks, you raise some good points. I think the intent here is to
try and fix one of the more egregious inconsistencies without increasing
the scope too much. We tried the big KIP approach with KIP-14 before and I
don't think we made much progress. I think it's reasonable to do this on a
case by case basis to avoid a lot of bikeshedding. The name
`--bootstrap-sever` is the one the project has generally settled on. It is
consistent with the `bootstrap.server` configuration which is used by all
the clients.

@Mitch I'd suggest taking this to a vote.

-Jason

On Fri, Aug 2, 2019 at 5:37 AM Dongjin Lee  wrote:

> Hello Mitchel,
>
> Thanks for the KIP. Sure, This inconsistency is really annoying and causing
> lots of confusions. Here are some comments:
>
> First, there is already a Jira issue about this problem, created by Ismael.
> https://issues.apache.org/jira/browse/KAFKA-8507 I added the link to the
> KIP.
>
> Add to this, it seems like this problem is a little bit more complicated
> than the first glance - you can see more in the comment I left in the Jira
> issue 2 weeks ago.
>
> *1. Location and parameter parser*
>
> Most tools are located in kafka.tools or kafka.admin package in the core
> module, with jOptSimple as a arguments parser. However,
> kafka-verifiable-consumer.sh (VerifiableConsumer) and
> kafka-verifiable-producer.sh (VerifiableProducer) are located in
> org.apache.kafka.tools package in tools module, with argparse4j as a
> parameter parser. For this reason, they do not provide standardized
> 'version' parameter. (see KAFKA-8292).
>
> You can find the implementation of jOptSimple parameter parsing and testing
> from the following:
>
> -
>
> https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/main/scala/kafka/tools/ProducerPerformance.scala
> -
>
> https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/test/scala/kafka/tools/ProducerPerformanceTest.scala
>
> *2. Connection name flag*
>
> Currently, there are three co-existing alternatives in the flags:
>
> a. broker-list: ConsumerPerformance, ConsoleProducer, etc.
> b. bootstrap-server: ConfigCommand, etc.
> c. bootstrap-servers: StreamsResetter.
>
> Before progressing this issue, it would be better to agree with which name
> should be the standard. It seems like most of the community agree to
> bootstrap-server (b) but, bootstrap-servers (c) deserves consideration - it
> is consistent with [ProducerConfig,
> ConsumerConfig].BOOTSTRAP_SERVERS_CONFIG.
>
> How do you think?
>
> *3. Other consistencies*
>
> As you can see in the comment, there are more inconsistency problems in the
> tools. May I take the tasks except for the tools you are working, as a
> follow-up KIP?
>
> @committers Is this okay?
>
> Thanks,
> Dongjin
>
>
> On Fri, Aug 2, 2019 at 7:15 AM Gwen Shapira  wrote:
>
> > +1 for the KIP
> >
> > On Tue, Jul 30, 2019 at 5:25 PM Mitchell  wrote:
> > >
> > > Hello,
> > > I have written a proposal to add the command line argument
> > > `--bootstrap-server` to 5 of the existing command line tools that do
> not
> > > currently use `--broker-list` for passing cluster connection
> information.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool
> > >
> > > Please take a look and let me know what you think.
> > > Thanks,
> > > -Mitch
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  github.com/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck:
> speakerdeck.com/dongjin
> *
>


Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-08 Thread George Li
 Hi Jason,

Can KIP-352 split the two metrics MaxLag and TotalLag for reassignment 
replication as well?  From the dashboard of these 2 metrics, one can see 
whether the replication is stuck (flat line) and estimate how long the 
reassignments will complete (how fast the Lag line is going down).

Thanks,
George


On Thursday, August 8, 2019, 01:41:49 PM PDT, Jason Gustafson 
 wrote:  
 
 Hey Stan,

Thanks for the suggestion. I have updated the proposal to include two new
meters for reassignment traffic inbound and outbound.

-Jason

On Thu, Aug 8, 2019 at 12:07 PM Stanislav Kozlovski 
wrote:

> Agreed on not totally spitting the replication incoming/outgoing metric -
> that could cause confusion. Just adding a new metric sounds good to me!
>
> The throttle follow-up is mentioned as part of future work in KIP-455 and I
> agree that it is way out of scope for this one.
>
>
> On Thu, Aug 8, 2019 at 8:03 PM Jason Gustafson  wrote:
>
> > Hi Stan,
> >
> > That's an interesting thought. I'm wondering if it would be better to
> leave
> > the current replication metrics counting for the total replication
> traffic
> > and add a new metric for reassignment traffic?
> >
> > By the way, a further KIP-455 follow-up that I won't attempt here would
> be
> > to have a separate throttle for reassignment traffic.
> >
> > -Jason
> >
> > On Thu, Aug 8, 2019 at 11:34 AM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > wrote:
> >
> > > Hi Jason,
> > >
> > > I like the new ReassigningPartitions metric. Would it be easy to expand
> > the
> > > scope of the KIP to split the ReplicationIncoming/Outgoing metric to
> > > distringuish between reassigning/non-reassigning traffic, or do you
> > prefer
> > > to keep this KIP nice and small?
> > >
> > > On Thu, Aug 8, 2019 at 12:08 AM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Since KIP-455 is passed, I would like to revive this proposal. I have
> > > > reduced the scope so that it is just changing the way we compute URP
> > and
> > > > adding a new metric for the number of reassigning partitions. Please
> > > take a
> > > > look:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
> > > > .
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:
> > > >
> > > > > Hey Jason,
> > > > >
> > > > > Yes, I also think the right solution is probably to include the
> > > > > expected_replication_factor in the per-topic znode and include this
> > > > > information in the UpdateMetadataRequest.
> > > > >
> > > > > And I agree with Gwen and Ismael that it is reasonable to redefine
> > URP
> > > as
> > > > > those partitions whose ISR set size < expected replication factor.
> > > Given
> > > > > that URP is being used for monitoring cluster availability and and
> > > > > reassignment progress, it seems that one of them will break
> depending
> > > on
> > > > > the URP definition. It seems better to keep the legitimate
> use-case,
> > > i.e.
> > > > > monitoring cluster availability. Users who want to monitor
> > reassignment
> > > > > progress probably should use the "kafka-reassign-partitions.sh
> > > --verify".
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > @Dong
> > > > > >
> > > > > > Ugh, you are right. This is indeed trickier than I imagined. You
> > > could
> > > > > > argue that the problem here is that there is no single source of
> > > truth
> > > > > for
> > > > > > the expected replication factor. While a reassignment is in
> > progress,
> > > > the
> > > > > > reassignment itself has the expected replica set. Otherwise, it
> is
> > > > stored
> > > > > > partition metadata itself. This is why manually deleting the
> > > > reassignment
> > > > > > is problematic. We lose the expected replica set and we depend on
> > > users
> > > > > to
> > > > > > reinstall it. I guess I'm starting to think that the way we track
> > > > > > reassignment state in the controller is problematic. In addition
> to
> > > the
> > > > > > problems caused by deletion, we cannot easily change an existing
> > > > > > reassignment.
> > > > > >
> > > > > > High level what I'm thinking is that we need to move the pending
> > > > > > reassignment state out of the single znode and into the
> individual
> > > > > metadata
> > > > > > of the reassigned partitions so that there is a single source of
> > > truth
> > > > > for
> > > > > > the expected replicas (and hence the replication factor). This
> > would
> > > > also
> > > > > > give us an easier mechanism to manage the batching of multiple
> > > > > > reassignments. Let me think on it a bit and see if I can come up
> > > with a
> > > > > > proposal.
> > > > > >
> > > > > > @Gwen, @Ismael
> > > > > >
> > > > > > That is fair. I also prefer to redefine URP if 

[jira] [Created] (KAFKA-8773) Static membership protocol borks on re-used group id

2019-08-08 Thread Raman Gupta (JIRA)
Raman Gupta created KAFKA-8773:
--

 Summary: Static membership protocol borks on re-used group id
 Key: KAFKA-8773
 URL: https://issues.apache.org/jira/browse/KAFKA-8773
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.0
Reporter: Raman Gupta


I am using the new static group membership protocol in 2.3.0. I have a 
situation in which an application defines multiple consumers, lets call them:

consumer-1
consumer-2

Each consumer uses the same group id "app-x", as they all belong to the same 
application. With dynamic group membership, this is no problem at all. However, 
with static membership starting a single instance of this application (and 
therefore both consumers have the same instance.id) fails consistently with 
errors like:

```
2019-08-08 16:56:47,223 ERROR --- org.apa.kaf.cli.con.int.AbstractCoordinator   
: [Consumer instanceId=x-1, clientId=consumer-2, groupId=x] Received fatal 
exception: group.instance.id gets fenced
2019-08-08 16:56:47,229 ERROR --- org.apa.kaf.cli.con.int.AbstractCoordinator   
: [Consumer instanceId=x-1, clientId=consumer-1, groupId=x] Received fatal 
exception: group.instance.id gets fenced
2019-08-08 16:56:47,234 ERROR 
---red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected 
this static consumer since another consumer with the same group.instance.id has 
registered with a different member.id.
]]
2019-08-08 16:56:47,229 ERROR --- 
red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected 
this static consumer since another consumer with the same group.instance.id has 
registered with a different member.id.
]]
```

and to top it off, I also get this obviously incorrect error:

```
2019-08-08 16:56:47,235 ERROR --- 
red.mic.kaf.AbstractKafkaAutoCommitConsumerService: Exception in polling 
thread. Will die for safety. [[EXCEPTION: 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'version': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110) 
~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:106)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:262)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
 ~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
~[kafka-clients-2.3.0.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
~[kafka-clients-2.3.0.jar:?]
at 
com.redock.microservice.kafka.BasicCommitAfterProcessingConsumer.run(BasicCommitAfterProcessingConsumer.kt:51)
 ~[classes/:?]
at 
com.redock.microservice.kafka.AbstractKafkaAutoCommitConsumerService$start$2.invokeSuspend(AbstractKafkaAutoCommitConsumerService.kt:44)
 [classes/:?]
... suppressed 2 lines
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) 
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
]]
```

The broker logs contain this error:

ERROR given member.id x-1-1565298855983 is identified as a known static member 
x-1,but not matching the expected member.id x-1-1565298855984 
(kafka.coordinator.group.GroupMetadata)

It seems like the client-id is not taken into account by the server in figuring 
the static group membership?

While the workaround is simple -- change the group id of each consumer to 
include the client id -- I don't believe this 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Hi Harsha

The reason we rejected the SslProvider route is that - we only needed a
custom way to load keys/certs. Not touch any policy that existing Providers
govern like SunJSSE Provider.

The ask here is different than KIP-492. We don't have any need to
modify/specify the algorithm parameter. Does that make sense?

Thanks
Maulin

On Thu, Aug 8, 2019 at 7:48 AM Harsha Chintalapani  wrote:

> In your KIP you added security. provider as rejected alternative and
> specified "its not the correct way". Do you mind explaining why its not? I
> didn't find any evidence in Java docs to say so. Contrary to your statement
> it does say in the java docs
> " However, please note that a provider can be used to implement any
> security service in Java that uses a pluggable architecture with a choice
> of implementations that fit underneath."
>
> Java Security Providers have been used by other projects to provide such
> integration . I am not sure if you looked into Spiffe project to
> efficiently distribute certificates but here is an example of Java provider
>
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
> which
> obtains certificates from local daemons.
> These integrations are being used in Tomcat, Jetty etc..  We are also using
> Security provider to do the same in our Kafka clusters. So unless I see
> more evidence why security.provider doesn't work for you
> adding new interfaces while there exists more cleaner way of  achieving the
> goals of this KIP  is unnecessary and breaks the well known security
> interfaces provided by Java itself.
>
> Thanks,
> Harsha
>
>
> On Thu, Aug 08, 2019 at 6:54 AM, Harsha Chintalapani 
> wrote:
>
> > Hi Maulin,
> >Not sure if you looked at my previous replies. This
> changes
> > are not required as there is already security Provider to do what you are
> > proposing.  This KIP https://cwiki.apache.org/confluence/display/KAFKA/
> > KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config also
> > addresses easy registration of such providers.
> >
> > Thanks,
> > Harsha
> >
> >
> > On Wed, Aug 07, 2019 at 11:31 PM, Maulin Vasavada  > com> wrote:
> >
> > Bump! Can somebody please review this?
> >
> > On Tue, Jul 16, 2019 at 1:51 PM Maulin Vasavada <
> maulin.vasav...@gmail.com>
> > wrote:
> >
> > Bump! Can somebody please review this?
> >
> >
>


Build failed in Jenkins: kafka-trunk-jdk8 #3836

2019-08-08 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Update dependencies for Kafka 2.4 (#7126)

--
[...truncated 2.60 MB...]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkForwards STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkForwards PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testOrderOfPluginUrlsWithJars STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testOrderOfPluginUrlsWithJars PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectFrameworkClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectFrameworkClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testThirdPartyClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testThirdPartyClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testClientConfigProvider STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testClientConfigProvider PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testAllowedConnectFrameworkClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testAllowedConnectFrameworkClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testJavaLibraryClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testJavaLibraryClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testEmptyPluginUrls STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testEmptyPluginUrls PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectorClientConfigOverridePolicy STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectorClientConfigOverridePolicy PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithJars STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithJars PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithZips STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithZips PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkBackwards STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkBackwards PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithAbsoluteSymlink STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithAbsoluteSymlink PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testEmptyStructurePluginUrls STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testEmptyStructurePluginUrls PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > connectorStatus STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > connectorStatus PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > testConnectors STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > testConnectors PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > testConnectorStatus 
STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > testConnectorStatus PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationEmptyConfig STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationEmptyConfig PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationMissingName STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationMissingName PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationInvalidTopics STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationInvalidTopics PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationTransformsExtendResults STARTED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationTransformsExtendResults PASSED

org.apache.kafka.connect.runtime.AbstractHerderTest > 
testConfigValidationPrincipalOnlyOverride STARTED


Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-08 Thread Jason Gustafson
Hey Stan,

Thanks for the suggestion. I have updated the proposal to include two new
meters for reassignment traffic inbound and outbound.

-Jason

On Thu, Aug 8, 2019 at 12:07 PM Stanislav Kozlovski 
wrote:

> Agreed on not totally spitting the replication incoming/outgoing metric -
> that could cause confusion. Just adding a new metric sounds good to me!
>
> The throttle follow-up is mentioned as part of future work in KIP-455 and I
> agree that it is way out of scope for this one.
>
>
> On Thu, Aug 8, 2019 at 8:03 PM Jason Gustafson  wrote:
>
> > Hi Stan,
> >
> > That's an interesting thought. I'm wondering if it would be better to
> leave
> > the current replication metrics counting for the total replication
> traffic
> > and add a new metric for reassignment traffic?
> >
> > By the way, a further KIP-455 follow-up that I won't attempt here would
> be
> > to have a separate throttle for reassignment traffic.
> >
> > -Jason
> >
> > On Thu, Aug 8, 2019 at 11:34 AM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > wrote:
> >
> > > Hi Jason,
> > >
> > > I like the new ReassigningPartitions metric. Would it be easy to expand
> > the
> > > scope of the KIP to split the ReplicationIncoming/Outgoing metric to
> > > distringuish between reassigning/non-reassigning traffic, or do you
> > prefer
> > > to keep this KIP nice and small?
> > >
> > > On Thu, Aug 8, 2019 at 12:08 AM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Since KIP-455 is passed, I would like to revive this proposal. I have
> > > > reduced the scope so that it is just changing the way we compute URP
> > and
> > > > adding a new metric for the number of reassigning partitions. Please
> > > take a
> > > > look:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
> > > > .
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:
> > > >
> > > > > Hey Jason,
> > > > >
> > > > > Yes, I also think the right solution is probably to include the
> > > > > expected_replication_factor in the per-topic znode and include this
> > > > > information in the UpdateMetadataRequest.
> > > > >
> > > > > And I agree with Gwen and Ismael that it is reasonable to redefine
> > URP
> > > as
> > > > > those partitions whose ISR set size < expected replication factor.
> > > Given
> > > > > that URP is being used for monitoring cluster availability and and
> > > > > reassignment progress, it seems that one of them will break
> depending
> > > on
> > > > > the URP definition. It seems better to keep the legitimate
> use-case,
> > > i.e.
> > > > > monitoring cluster availability. Users who want to monitor
> > reassignment
> > > > > progress probably should use the "kafka-reassign-partitions.sh
> > > --verify".
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > @Dong
> > > > > >
> > > > > > Ugh, you are right. This is indeed trickier than I imagined. You
> > > could
> > > > > > argue that the problem here is that there is no single source of
> > > truth
> > > > > for
> > > > > > the expected replication factor. While a reassignment is in
> > progress,
> > > > the
> > > > > > reassignment itself has the expected replica set. Otherwise, it
> is
> > > > stored
> > > > > > partition metadata itself. This is why manually deleting the
> > > > reassignment
> > > > > > is problematic. We lose the expected replica set and we depend on
> > > users
> > > > > to
> > > > > > reinstall it. I guess I'm starting to think that the way we track
> > > > > > reassignment state in the controller is problematic. In addition
> to
> > > the
> > > > > > problems caused by deletion, we cannot easily change an existing
> > > > > > reassignment.
> > > > > >
> > > > > > High level what I'm thinking is that we need to move the pending
> > > > > > reassignment state out of the single znode and into the
> individual
> > > > > metadata
> > > > > > of the reassigned partitions so that there is a single source of
> > > truth
> > > > > for
> > > > > > the expected replicas (and hence the replication factor). This
> > would
> > > > also
> > > > > > give us an easier mechanism to manage the batching of multiple
> > > > > > reassignments. Let me think on it a bit and see if I can come up
> > > with a
> > > > > > proposal.
> > > > > >
> > > > > > @Gwen, @Ismael
> > > > > >
> > > > > > That is fair. I also prefer to redefine URP if we think the
> > > > compatibility
> > > > > > impact is a lower concern than continued misuse.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 3, 2018 at 12:25 PM, Ismael Juma 
> > > > wrote:
> > > > > >
> > > > > > > RIght, that was my thinking too.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Fri, Aug 3, 2018 at 12:04 PM Gwen 

Jenkins build is back to normal : kafka-trunk-jdk11 #739

2019-08-08 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8772) Flaky Test kafka.api.DelegationTokenEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-08-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8772:
--

 Summary: Flaky Test 
kafka.api.DelegationTokenEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 Key: KAFKA-8772
 URL: https://issues.apache.org/jira/browse/KAFKA-8772
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Sophie Blee-Goldman


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24011/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
h3. Stacktrace

org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions$class.fail(Assertions.scala:1091) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1308) at 
kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1317) at 
kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
 at 
kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
 at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 

[jira] [Resolved] (KAFKA-8763) Flaky Test SaslSslAdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels

2019-08-08 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8763.

Resolution: Duplicate

duplicate of 8756

> Flaky Test 
> SaslSslAdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels
> -
>
> Key: KAFKA-8763
> URL: https://issues.apache.org/jira/browse/KAFKA-8763
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, unit tests
>Reporter: Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6740/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testIncrementalAlterConfigsForLog4jLogLevels/]
>  
> h3. Error Message
> org.junit.ComparisonFailure: expected:<[OFF]> but was:<[INFO]>
> h3. Stacktrace
> org.junit.ComparisonFailure: expected:<[OFF]> but was:<[INFO]> at 
> org.junit.Assert.assertEquals(Assert.java:117) at 
> org.junit.Assert.assertEquals(Assert.java:146) at 
> kafka.api.AdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels(AdminClientIntegrationTest.scala:1850)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:834)
> h3. Standard Output
> Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt 
> false ticketCache is null isInitiator true KeyTab is 
> /tmp/kafka3326803823781197290.tmp refreshKrb5Config is false principal is 
> kafka/localh...@example.com tryFirstPass is false useFirstPass is false 
> storePass is false clearPass is false principal is 
> kafka/localh...@example.com Will use keytab Commit Succeeded



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8771) Flaky Test SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAuthenticationFailure

2019-08-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8771:
--

 Summary: Flaky Test 
SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAuthenticationFailure
 Key: KAFKA-8771
 URL: https://issues.apache.org/jira/browse/KAFKA-8771
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Sophie Blee-Goldman


h3. 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24018/testReport/junit/kafka.api/SaslClientsWithInvalidCredentialsTest/testManualAssignmentConsumerWithAuthenticationFailure/]
h3. Error Message

java.lang.AssertionError: expected:<1> but was:<0>
h3. Stacktrace

java.lang.AssertionError: expected:<1> but was:<0> at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:647) at 
org.junit.Assert.assertEquals(Assert.java:633) at 
kafka.api.SaslClientsWithInvalidCredentialsTest$$anonfun$verifyConsumerWithAuthenticationFailure$4.apply$mcV$sp(SaslClientsWithInvalidCredentialsTest.scala:128)
 at 
kafka.api.SaslClientsWithInvalidCredentialsTest$$anonfun$verifyWithRetry$1.apply$mcZ$sp(SaslClientsWithInvalidCredentialsTest.scala:233)
 at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:819) at 
kafka.api.SaslClientsWithInvalidCredentialsTest.verifyWithRetry(SaslClientsWithInvalidCredentialsTest.scala:230)
 at 
kafka.api.SaslClientsWithInvalidCredentialsTest.verifyConsumerWithAuthenticationFailure(SaslClientsWithInvalidCredentialsTest.scala:128)
 at 
kafka.api.SaslClientsWithInvalidCredentialsTest.testManualAssignmentConsumerWithAuthenticationFailure(SaslClientsWithInvalidCredentialsTest.scala:109)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
 at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
 at 

Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-08 Thread Stanislav Kozlovski
Agreed on not totally spitting the replication incoming/outgoing metric -
that could cause confusion. Just adding a new metric sounds good to me!

The throttle follow-up is mentioned as part of future work in KIP-455 and I
agree that it is way out of scope for this one.


On Thu, Aug 8, 2019 at 8:03 PM Jason Gustafson  wrote:

> Hi Stan,
>
> That's an interesting thought. I'm wondering if it would be better to leave
> the current replication metrics counting for the total replication traffic
> and add a new metric for reassignment traffic?
>
> By the way, a further KIP-455 follow-up that I won't attempt here would be
> to have a separate throttle for reassignment traffic.
>
> -Jason
>
> On Thu, Aug 8, 2019 at 11:34 AM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Hi Jason,
> >
> > I like the new ReassigningPartitions metric. Would it be easy to expand
> the
> > scope of the KIP to split the ReplicationIncoming/Outgoing metric to
> > distringuish between reassigning/non-reassigning traffic, or do you
> prefer
> > to keep this KIP nice and small?
> >
> > On Thu, Aug 8, 2019 at 12:08 AM Jason Gustafson 
> > wrote:
> >
> > > Hi All,
> > >
> > > Since KIP-455 is passed, I would like to revive this proposal. I have
> > > reduced the scope so that it is just changing the way we compute URP
> and
> > > adding a new metric for the number of reassigning partitions. Please
> > take a
> > > look:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
> > > .
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:
> > >
> > > > Hey Jason,
> > > >
> > > > Yes, I also think the right solution is probably to include the
> > > > expected_replication_factor in the per-topic znode and include this
> > > > information in the UpdateMetadataRequest.
> > > >
> > > > And I agree with Gwen and Ismael that it is reasonable to redefine
> URP
> > as
> > > > those partitions whose ISR set size < expected replication factor.
> > Given
> > > > that URP is being used for monitoring cluster availability and and
> > > > reassignment progress, it seems that one of them will break depending
> > on
> > > > the URP definition. It seems better to keep the legitimate use-case,
> > i.e.
> > > > monitoring cluster availability. Users who want to monitor
> reassignment
> > > > progress probably should use the "kafka-reassign-partitions.sh
> > --verify".
> > > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson 
> > > > wrote:
> > > >
> > > > > @Dong
> > > > >
> > > > > Ugh, you are right. This is indeed trickier than I imagined. You
> > could
> > > > > argue that the problem here is that there is no single source of
> > truth
> > > > for
> > > > > the expected replication factor. While a reassignment is in
> progress,
> > > the
> > > > > reassignment itself has the expected replica set. Otherwise, it is
> > > stored
> > > > > partition metadata itself. This is why manually deleting the
> > > reassignment
> > > > > is problematic. We lose the expected replica set and we depend on
> > users
> > > > to
> > > > > reinstall it. I guess I'm starting to think that the way we track
> > > > > reassignment state in the controller is problematic. In addition to
> > the
> > > > > problems caused by deletion, we cannot easily change an existing
> > > > > reassignment.
> > > > >
> > > > > High level what I'm thinking is that we need to move the pending
> > > > > reassignment state out of the single znode and into the individual
> > > > metadata
> > > > > of the reassigned partitions so that there is a single source of
> > truth
> > > > for
> > > > > the expected replicas (and hence the replication factor). This
> would
> > > also
> > > > > give us an easier mechanism to manage the batching of multiple
> > > > > reassignments. Let me think on it a bit and see if I can come up
> > with a
> > > > > proposal.
> > > > >
> > > > > @Gwen, @Ismael
> > > > >
> > > > > That is fair. I also prefer to redefine URP if we think the
> > > compatibility
> > > > > impact is a lower concern than continued misuse.
> > > > >
> > > > > -Jason
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Aug 3, 2018 at 12:25 PM, Ismael Juma 
> > > wrote:
> > > > >
> > > > > > RIght, that was my thinking too.
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Fri, Aug 3, 2018 at 12:04 PM Gwen Shapira 
> > > > wrote:
> > > > > >
> > > > > > > On Fri, Aug 3, 2018 at 11:23 AM, Jason Gustafson <
> > > ja...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Ismael,
> > > > > > > >
> > > > > > > > Yeah, my initial inclination was to redefine URP as well. My
> > only
> > > > > doubt
> > > > > > > was
> > > > > > > > how it would affect existing tools which might depend on URPs
> > to
> > > > > track
> > > > > > > the
> > > > > > > > progress of a reassignment. I decided to be conservative 

Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-08 Thread Jason Gustafson
Hi Stan,

That's an interesting thought. I'm wondering if it would be better to leave
the current replication metrics counting for the total replication traffic
and add a new metric for reassignment traffic?

By the way, a further KIP-455 follow-up that I won't attempt here would be
to have a separate throttle for reassignment traffic.

-Jason

On Thu, Aug 8, 2019 at 11:34 AM Stanislav Kozlovski 
wrote:

> Hi Jason,
>
> I like the new ReassigningPartitions metric. Would it be easy to expand the
> scope of the KIP to split the ReplicationIncoming/Outgoing metric to
> distringuish between reassigning/non-reassigning traffic, or do you prefer
> to keep this KIP nice and small?
>
> On Thu, Aug 8, 2019 at 12:08 AM Jason Gustafson 
> wrote:
>
> > Hi All,
> >
> > Since KIP-455 is passed, I would like to revive this proposal. I have
> > reduced the scope so that it is just changing the way we compute URP and
> > adding a new metric for the number of reassigning partitions. Please
> take a
> > look:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
> > .
> >
> > Thanks,
> > Jason
> >
> > On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Yes, I also think the right solution is probably to include the
> > > expected_replication_factor in the per-topic znode and include this
> > > information in the UpdateMetadataRequest.
> > >
> > > And I agree with Gwen and Ismael that it is reasonable to redefine URP
> as
> > > those partitions whose ISR set size < expected replication factor.
> Given
> > > that URP is being used for monitoring cluster availability and and
> > > reassignment progress, it seems that one of them will break depending
> on
> > > the URP definition. It seems better to keep the legitimate use-case,
> i.e.
> > > monitoring cluster availability. Users who want to monitor reassignment
> > > progress probably should use the "kafka-reassign-partitions.sh
> --verify".
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson 
> > > wrote:
> > >
> > > > @Dong
> > > >
> > > > Ugh, you are right. This is indeed trickier than I imagined. You
> could
> > > > argue that the problem here is that there is no single source of
> truth
> > > for
> > > > the expected replication factor. While a reassignment is in progress,
> > the
> > > > reassignment itself has the expected replica set. Otherwise, it is
> > stored
> > > > partition metadata itself. This is why manually deleting the
> > reassignment
> > > > is problematic. We lose the expected replica set and we depend on
> users
> > > to
> > > > reinstall it. I guess I'm starting to think that the way we track
> > > > reassignment state in the controller is problematic. In addition to
> the
> > > > problems caused by deletion, we cannot easily change an existing
> > > > reassignment.
> > > >
> > > > High level what I'm thinking is that we need to move the pending
> > > > reassignment state out of the single znode and into the individual
> > > metadata
> > > > of the reassigned partitions so that there is a single source of
> truth
> > > for
> > > > the expected replicas (and hence the replication factor). This would
> > also
> > > > give us an easier mechanism to manage the batching of multiple
> > > > reassignments. Let me think on it a bit and see if I can come up
> with a
> > > > proposal.
> > > >
> > > > @Gwen, @Ismael
> > > >
> > > > That is fair. I also prefer to redefine URP if we think the
> > compatibility
> > > > impact is a lower concern than continued misuse.
> > > >
> > > > -Jason
> > > >
> > > >
> > > >
> > > > On Fri, Aug 3, 2018 at 12:25 PM, Ismael Juma 
> > wrote:
> > > >
> > > > > RIght, that was my thinking too.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, Aug 3, 2018 at 12:04 PM Gwen Shapira 
> > > wrote:
> > > > >
> > > > > > On Fri, Aug 3, 2018 at 11:23 AM, Jason Gustafson <
> > ja...@confluent.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Ismael,
> > > > > > >
> > > > > > > Yeah, my initial inclination was to redefine URP as well. My
> only
> > > > doubt
> > > > > > was
> > > > > > > how it would affect existing tools which might depend on URPs
> to
> > > > track
> > > > > > the
> > > > > > > progress of a reassignment. I decided to be conservative in the
> > > end,
> > > > > but
> > > > > > > I'd reconsider if we think it is not a major concern. It is
> > > annoying
> > > > to
> > > > > > > need a new category.
> > > > > > >
> > > > > >
> > > > > > There are existing tools that use URP to track reassignment, but
> > > there
> > > > > are
> > > > > > many more tools that use URP for monitoring and alerting. If I
> > > > understand
> > > > > > Ismael's suggestion correctly, a re-definition will improve the
> > > > > reliability
> > > > > > of the monitoring tools (since there won't be false alerts in
> case
> > of
> > > > > > re-assignment) without having to switch to a new metric.
> 

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-08 Thread Jun Rao
Hi, Justine,

Thanks for the KIP. Overall, it seems to be a good improvement.

However, I think Harsha's point seems reasonable. We had
auto.create.topics.enable config on the broker to allow admins to disable
topic creation from the producer/consumer clients before we had the
security feature. The need for that config is reduced with the security
feature, but may still be present since not all places have security
enabled. It's true that a non-secured environment is vulnerable to some
additional attacks, but producer/consumer are the most common way for a
user to interact with the broker. So, keeping that config for backward
compatibility could still be useful if it's not introducing too much effort
or extra confusion.

Here is a one potential alternative way that I was thinking. We add a new
field in the CreateTopicRequest to indicate whether it's from the producer
or not. If auto.create.topics.enable is false, CreateTopicRequest from the
producer will be rejected. We probably don't need to introduce the new
config (which seems a bit hard to explain) in the producer. Instead, the
new producer always uses MetadataRequest with AllowAutoTopicCreation set to
false to get the metadata and if the metadata is not present, send the
new CreateTopicRequest
(assuming the broker supports it) to try to create the topic automatically.
Whether the creation is allowed or not will be determined by the broker.
This will make the behavior backward compatible and we can still achieve
the main goal of the KIP, which is not relying on MetadataRequest for topic
creation. What do you think?

Thanks,

Jun

On Thu, Aug 8, 2019 at 1:34 AM M. Manna  wrote:

> Hi,
>
> If I may, perhaps you could simplify everything by using only
> 'auto.create.topics.enable' as a value along with true. In other words, the
> public interfaces section should only have [true,auto.create.topics.enable,
> false].
>
> The reason for this is that auto.create.topics.enable is already known to
> users as a "Server-SIde" config. So all you are saying is
>
> a) To avoid day 1 impact, it will follow whatever auto.create.topics.enable
> value is set.
> b) False means - no client side topic creation
> c) True means client side topic creation.
>
> It saves creating 2 more new strings :). But not too expensive anyway.
>
> Also, when you deprecate auto.create.topics.enable - you must provide
> sufficient logic to ensure that things like rolling upgrade doesn't
> temporarily break anything. I apologise if you have already accounted for
> this, but wanted to mention since I didn't notice this on the KIP.
>
> Let me know how this sounds.
>
> Regards,
>
> On Wed, 7 Aug 2019 at 19:10, Justine Olshan  wrote:
>
> > Hi Harsha,
> >
> > I think my message may have gotten lost in all the others.
> >
> > Two of the goals of this KIP are to 1) allow auto-creation on specific
> > clients when the broker default is false and 2) eventually replace the
> > broker config.
> >
> > In order to accomplish these two goals, we need the producer to be able
> to
> > create topics despite the broker config. (How can we replace a function
> > when we rely on it?)
> > I think at this point we have a fundamental disagreement in what we
> should
> > allow the producer to do.
> > In my previous message I mentioned a config that would allow for the
> broker
> > to prevent producer auto-creation. (It would be disabled by default.) It
> > would fix your issue for now, but could lead to more complications later.
> >
> > Thank you,
> > Justine
> >
> >
> > On Wed, Aug 7, 2019 at 10:56 AM Harsha Chintalapani 
> > wrote:
> >
> > > On Wed, Aug 07, 2019 at 9:50 AM, Colin McCabe 
> wrote:
> > >
> > > > On Wed, Aug 7, 2019, at 09:24, Harsha Ch wrote:
> > > >
> > > > On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org
> >
> > > > wrote:
> > > >
> > > > On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> > > >
> > > > Hi Colin,
> > > > "Hmm... I'm not sure I follow. Users don't have to build their own
> > > > tooling, right? They can use any of the shell scripts that we've
> > shipped
> > > in
> > > > the last few releases. For example, if any of your users run it, this
> > > shell
> > > > script will delete all of the topics from your non-security-enabled
> > > > cluster:
> > > >
> > > > ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > > > --bootstrap-server localhost:9092 --list 2>/dev/null
> > > > | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > > > --bootstrap-server localhost:9092 --delete
> > > > --topic
> > > >
> > > > They will need to fill in the correct bootstrap servers list, of
> > course,
> > > > not localhost. This deletion script will work on some pretty old
> > brokers,
> > > > even back to the 0.10 releases. It seems a little odd to trust your
> > users
> > > > with this power, but not trust them to avoid changing a particular
> > > > configuration key."
> > > >
> > > > The above will blocked by the server if we set delete.topic.enable to
> > > > 

Re: KIP-352: Distinguish URPs caused by reassignment

2019-08-08 Thread Stanislav Kozlovski
Hi Jason,

I like the new ReassigningPartitions metric. Would it be easy to expand the
scope of the KIP to split the ReplicationIncoming/Outgoing metric to
distringuish between reassigning/non-reassigning traffic, or do you prefer
to keep this KIP nice and small?

On Thu, Aug 8, 2019 at 12:08 AM Jason Gustafson  wrote:

> Hi All,
>
> Since KIP-455 is passed, I would like to revive this proposal. I have
> reduced the scope so that it is just changing the way we compute URP and
> adding a new metric for the number of reassigning partitions. Please take a
> look:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
> .
>
> Thanks,
> Jason
>
> On Sun, Aug 5, 2018 at 3:41 PM Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Yes, I also think the right solution is probably to include the
> > expected_replication_factor in the per-topic znode and include this
> > information in the UpdateMetadataRequest.
> >
> > And I agree with Gwen and Ismael that it is reasonable to redefine URP as
> > those partitions whose ISR set size < expected replication factor. Given
> > that URP is being used for monitoring cluster availability and and
> > reassignment progress, it seems that one of them will break depending on
> > the URP definition. It seems better to keep the legitimate use-case, i.e.
> > monitoring cluster availability. Users who want to monitor reassignment
> > progress probably should use the "kafka-reassign-partitions.sh --verify".
> >
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Fri, Aug 3, 2018 at 2:57 PM, Jason Gustafson 
> > wrote:
> >
> > > @Dong
> > >
> > > Ugh, you are right. This is indeed trickier than I imagined. You could
> > > argue that the problem here is that there is no single source of truth
> > for
> > > the expected replication factor. While a reassignment is in progress,
> the
> > > reassignment itself has the expected replica set. Otherwise, it is
> stored
> > > partition metadata itself. This is why manually deleting the
> reassignment
> > > is problematic. We lose the expected replica set and we depend on users
> > to
> > > reinstall it. I guess I'm starting to think that the way we track
> > > reassignment state in the controller is problematic. In addition to the
> > > problems caused by deletion, we cannot easily change an existing
> > > reassignment.
> > >
> > > High level what I'm thinking is that we need to move the pending
> > > reassignment state out of the single znode and into the individual
> > metadata
> > > of the reassigned partitions so that there is a single source of truth
> > for
> > > the expected replicas (and hence the replication factor). This would
> also
> > > give us an easier mechanism to manage the batching of multiple
> > > reassignments. Let me think on it a bit and see if I can come up with a
> > > proposal.
> > >
> > > @Gwen, @Ismael
> > >
> > > That is fair. I also prefer to redefine URP if we think the
> compatibility
> > > impact is a lower concern than continued misuse.
> > >
> > > -Jason
> > >
> > >
> > >
> > > On Fri, Aug 3, 2018 at 12:25 PM, Ismael Juma 
> wrote:
> > >
> > > > RIght, that was my thinking too.
> > > >
> > > > Ismael
> > > >
> > > > On Fri, Aug 3, 2018 at 12:04 PM Gwen Shapira 
> > wrote:
> > > >
> > > > > On Fri, Aug 3, 2018 at 11:23 AM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Ismael,
> > > > > >
> > > > > > Yeah, my initial inclination was to redefine URP as well. My only
> > > doubt
> > > > > was
> > > > > > how it would affect existing tools which might depend on URPs to
> > > track
> > > > > the
> > > > > > progress of a reassignment. I decided to be conservative in the
> > end,
> > > > but
> > > > > > I'd reconsider if we think it is not a major concern. It is
> > annoying
> > > to
> > > > > > need a new category.
> > > > > >
> > > > >
> > > > > There are existing tools that use URP to track reassignment, but
> > there
> > > > are
> > > > > many more tools that use URP for monitoring and alerting. If I
> > > understand
> > > > > Ismael's suggestion correctly, a re-definition will improve the
> > > > reliability
> > > > > of the monitoring tools (since there won't be false alerts in case
> of
> > > > > re-assignment) without having to switch to a new metric.
> > > > >
> > > > > I think we should choose the proposal that improves the more common
> > > usage
> > > > > of the metric, in this case, failure monitoring rather than
> > > reassignment.
> > > > >
> > > > >
> > > > > >
> > > > > > About your question about storage in ZK, I can't think of
> anything
> > > > > > additional that we need. Probably the main difficulty is getting
> > > access
> > > > > to
> > > > > > the replication factor in the topic utility. My basic thought was
> > > just
> > > > to
> > > > > > collect the URPs (as we know them today) and use the config API
> to
> > > > > > partition them based on the replication factor. Do you see any
> > > problems
> > > > > > with 

Re: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-08 Thread Colin McCabe
Hi Koushik,

The vote for this KIP already passed.

See https://www.mail-archive.com/dev@kafka.apache.org/msg99636.html

best,
Colin

On Thu, Aug 8, 2019, at 10:50, Koushik Chitta wrote:
> Thanks Colin, George.   Can we restart the voting for this KIP.
> 
> Thanks,
> Koushik 
> 
> -Original Message-
> From: Colin McCabe  
> Sent: Wednesday, August 7, 2019 5:17 PM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> Reassignment
> 
> On Wed, Aug 7, 2019, at 15:41, George Li wrote:
> > This email seemed to get lost in the dev email server.  Resending. 
> > 
> > 
> > On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li 
> >  wrote:
> > 
> > 
> > The pending reassignments partitions would be reported as URP (Under 
> > Replicated Partitions).  or maybe reported as a separate metrics of 
> > RURP (Reassignment URP) since now we can derived from the new 
> > AddingReplicas. An alert could be triggered based on this.
> > 
> 
> Hi George,
> 
> I agree that this would be a great idea for follow up work.  Check out 
> KIP-352, which discusses creating a such a metric. :)
> 
> > 
> > 
> > It would be nice if ListPartitionReassignmentResult could return the 
> > "elapsed time/duration" of the current pending reassignments, the 
> > calling client can flag those current long running reassignments and 
> > alert.  However, what I would be interested is probably the total # of 
> > pending reassignments because I will submit reassignments in batches, 
> > e.g. 50 reassignments per batch.  If the pending reassignments # is 
> > below that per batch #, submit more new reassignments = (per_batch_# - 
> > pending_#).
> > 
> 
> It is definitely useful to know what reassignments exist.  If you call 
> ListPartitionReassignments, you can count how many results you get, in 
> order to implement a policy like that.
> 
> I'm not sure if knowing how long reassignments have been in progress 
> will be important or not.  I think we should give people some time to 
> try out the new APIs and see what could be improved based on their 
> experience.
> 
> > 
> > 
> > It seems currently, the ReplicaFetcher threads could quite easily crash 
> > because of some exceptions. e.g. Java Out Of Memory, and would just 
> > remain dead (jstack to dump threads to check the # of running 
> > ReplicaFetcher threads) without getting restarted automatically, so 
> > needs to bounce the broker.  It would be nice to make the 
> > ReplicaFetcher more robust/resilient of catching more exceptions, and 
> > if crashed, get restarted after some time. 
> > 
> 
> This has definitely been an issue in the past, I agree.  Thankfully, we 
> recently did improve the robustness of the ReplicaFetcher.  Check out 
> "KIP-461: Improve Replica Fetcher behavior at handling partition 
> failure."
> 
> cheers,
> Colin
> 
> > 
> > 
> > Thanks,
> > 
> > George
> > 
> > 
> > 
> > On 2019/08/06 23:07:19, "Colin McCabe"  wrote: 
> > > Hi Koushik,
> > > 
> > > Thanks for the idea.  This KIP is already pretty big, so I think we'll 
> > > have to consider ideas like this in follow-on KIPs.
> > > 
> > > In general, figuring out what's wrong with replication is a pretty tough 
> > > problem.  If we had an API for this, we'd probably want it to be unified, 
> > > and not specific to reassigning partitions.
> > > 
> > > regards,
> > > Colin
> > > 
> > > 
> > > On Tue, Aug 6, 2019, at 10:57, Koushik Chitta wrote:
> > > > Hey Colin,
> > > > 
> > > > Can the ListPartitionReassignmentsResult include the status of the 
> > > > current reassignment progress of each partition? A reassignment can be 
> > > > in progress for different reasons and the status can give the option to 
> > > > alter the current reassignment.
> > > > 
> > > > Example -  A leaderISRRequest of a new assigned replicas can be 
> > > > ignored/errored because of a storage exception.  And reassignment batch 
> > > > will be waiting indefinitely for the new assigned replicas to be in 
> > > > sync with the leader of the partition.  
> > > >   Showing the status will give an option to alter the 
> > > > affected 
> > > > partitions and allow the batch to complete reassignment.
> > > > 
> > > > OAR = {1, 2, 3} and RAR = {4,5,6}
> > > > 
> > > >  AR leader/isr
> > > > {1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest 
> > > > was lost/skipped for 5 and the reassignment operation will be waiting 
> > > > indefinitely for the 5 to be insync.
> > > > 
> > > > 
> > > > 
> > > > Thanks,
> > > > Koushik
> > > > 
> > > > -Original Message-
> > > > From: Jun Rao  
> > > > Sent: Friday, August 2, 2019 10:04 AM
> > > > To: dev 
> > > > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> > > > Reassignment
> > > > 
> > > > Hi, Colin,
> > > > 
> > > > First, since we are changing the format of LeaderAndIsrRequest, which 
> > > > is an inter broker request, it seems that we will need IBP during 
> > > 

RE: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-08 Thread Koushik Chitta
Thanks Colin, George.   Can we restart the voting for this KIP.

Thanks,
Koushik 

-Original Message-
From: Colin McCabe  
Sent: Wednesday, August 7, 2019 5:17 PM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
Reassignment

On Wed, Aug 7, 2019, at 15:41, George Li wrote:
> This email seemed to get lost in the dev email server.  Resending. 
> 
> 
> On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li 
>  wrote:
> 
> 
> The pending reassignments partitions would be reported as URP (Under 
> Replicated Partitions).  or maybe reported as a separate metrics of 
> RURP (Reassignment URP) since now we can derived from the new 
> AddingReplicas. An alert could be triggered based on this.
> 

Hi George,

I agree that this would be a great idea for follow up work.  Check out KIP-352, 
which discusses creating a such a metric. :)

> 
> 
> It would be nice if ListPartitionReassignmentResult could return the 
> "elapsed time/duration" of the current pending reassignments, the 
> calling client can flag those current long running reassignments and 
> alert.  However, what I would be interested is probably the total # of 
> pending reassignments because I will submit reassignments in batches, 
> e.g. 50 reassignments per batch.  If the pending reassignments # is 
> below that per batch #, submit more new reassignments = (per_batch_# - 
> pending_#).
> 

It is definitely useful to know what reassignments exist.  If you call 
ListPartitionReassignments, you can count how many results you get, in order to 
implement a policy like that.

I'm not sure if knowing how long reassignments have been in progress will be 
important or not.  I think we should give people some time to try out the new 
APIs and see what could be improved based on their experience.

> 
> 
> It seems currently, the ReplicaFetcher threads could quite easily crash 
> because of some exceptions. e.g. Java Out Of Memory, and would just 
> remain dead (jstack to dump threads to check the # of running 
> ReplicaFetcher threads) without getting restarted automatically, so 
> needs to bounce the broker.  It would be nice to make the 
> ReplicaFetcher more robust/resilient of catching more exceptions, and 
> if crashed, get restarted after some time. 
> 

This has definitely been an issue in the past, I agree.  Thankfully, we 
recently did improve the robustness of the ReplicaFetcher.  Check out "KIP-461: 
Improve Replica Fetcher behavior at handling partition failure."

cheers,
Colin

> 
> 
> Thanks,
> 
> George
> 
> 
> 
> On 2019/08/06 23:07:19, "Colin McCabe"  wrote: 
> > Hi Koushik,
> > 
> > Thanks for the idea.  This KIP is already pretty big, so I think we'll have 
> > to consider ideas like this in follow-on KIPs.
> > 
> > In general, figuring out what's wrong with replication is a pretty tough 
> > problem.  If we had an API for this, we'd probably want it to be unified, 
> > and not specific to reassigning partitions.
> > 
> > regards,
> > Colin
> > 
> > 
> > On Tue, Aug 6, 2019, at 10:57, Koushik Chitta wrote:
> > > Hey Colin,
> > > 
> > > Can the ListPartitionReassignmentsResult include the status of the 
> > > current reassignment progress of each partition? A reassignment can be 
> > > in progress for different reasons and the status can give the option to 
> > > alter the current reassignment.
> > > 
> > > Example -  A leaderISRRequest of a new assigned replicas can be 
> > > ignored/errored because of a storage exception.  And reassignment batch 
> > > will be waiting indefinitely for the new assigned replicas to be in 
> > > sync with the leader of the partition.  
> > > Showing the status will give an option to alter the affected 
> > > partitions and allow the batch to complete reassignment.
> > > 
> > > OAR = {1, 2, 3} and RAR = {4,5,6}
> > > 
> > >  AR leader/isr
> > > {1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest 
> > > was lost/skipped for 5 and the reassignment operation will be waiting 
> > > indefinitely for the 5 to be insync.
> > > 
> > > 
> > > 
> > > Thanks,
> > > Koushik
> > > 
> > > -Original Message-
> > > From: Jun Rao  
> > > Sent: Friday, August 2, 2019 10:04 AM
> > > To: dev 
> > > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> > > Reassignment
> > > 
> > > Hi, Colin,
> > > 
> > > First, since we are changing the format of LeaderAndIsrRequest, which 
> > > is an inter broker request, it seems that we will need IBP during 
> > > rolling upgrade. Could we add that to the compatibility section?
> > > 
> > > Regarding UnsupportedVersionException, even without ZK node version 
> > > bump, we probably want to only use the new ZK value fields after all 
> > > brokers have been upgraded to the new binary. Otherwise, the 
> > > reassignment task may not be completed if the controller changes to a 
> > > broker still on the old binary.
> > > IBP is one way to achieve that. The main thing is 

[DISCUSS] Modularization of kafka client separating server related classes/interfaces

2019-08-08 Thread Satish Duggana
Hi,

There are many classes in the client module that are not really related to
the client. It is good to have common modules structure with respective
classes/interfaces. Implementors/providers need to have dependency only on
those modules instead of having a dependency on the client module.

Below common module structure may be a good starting point for the
discussion.

kafka-common:

Common classes which can be used by any of the other modules

like client, kafka-server-common, and server(core), streams etc.

Kafka-server-common(or kafka-core-common):

Classes required only for server(core) and for the implementors/providers.



Below are some of the common server interfaces/classes which are required
only on the server(or core). These are not related to the client module.


org.apache.kafka.server.policy

AlterConfigPolicy

CreateTopicPolicy

org.apache.kafka.server.quota

ClientQuotaCallback

ClientQuotaEntity

ClientQuotaType

org.apache.kafka.common.replica

ClientMetadata

PartitionView

RackAwareReplicaSelector

ReplicaSelector

ReplicaView

One more example, java Authorizer interface is introduced with KIP-504. Its
implementation will be applicable only for server(core) module. This can be
moved to kafka-server-common module instead of pushing into kafka client
module.

Please let me know your thoughts/opinions.

Thanks,

Satish.


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-08 Thread Stanislav Kozlovski
What do people think if we exposed:
* eligible topics/replicas pending delete
* ineligible topics/replicas pending delete

On Thu, Aug 8, 2019 at 5:16 PM David Arthur  wrote:

> It looks like topicsIneligibleForDeletion is a subset of topicsToBeDeleted
> in the controller.
>
> On Thu, Aug 8, 2019 at 11:16 AM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > ineligible replicas/topics are not included in the pending metrics,
> right?
> > If so, sounds good to me.
> >
> > On Thu, Aug 8, 2019 at 4:12 PM David Arthur  wrote:
> >
> > > Yes I think exposing ineligible topics would be useful as well. The
> > > controller also tracks this ineligible state for replicas. Would that
> be
> > > useful to expose as well?
> > >
> > > In that case, we'd be up to four new metrics:
> > > * topics pending delete
> > > * replicas pending delete
> > > * ineligible topics
> > > * ineligible replicas
> > >
> > > Thoughts?
> > >
> > >
> > > On Wed, Aug 7, 2019 at 5:16 PM Jason Gustafson 
> > wrote:
> > >
> > > > Thanks for the KIP. This is useful. The controller also maintains a
> set
> > > for
> > > > topics which are awaiting deletion, but currently ineligible. A topic
> > > which
> > > > is undergoing reassignment, for example, is ineligible for deletion.
> > > Would
> > > > it make sense to have a metric for this as well?
> > > >
> > > > -Jason
> > > >
> > > > On Wed, Aug 7, 2019 at 1:52 PM David Arthur 
> wrote:
> > > >
> > > > > Updated the KIP with a count of replicas awaiting deletion.
> > > > >
> > > > > On Wed, Aug 7, 2019 at 9:37 AM David Arthur 
> > wrote:
> > > > >
> > > > > > Thanks for the feedback, Stan. That's a good point about the
> > > partition
> > > > > > count -- I'll poke around and see if I can surface this value in
> > the
> > > > > > Controller.
> > > > > >
> > > > > > On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski <
> > > > > stanis...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > >> Thanks for the KIP David,
> > > > > >>
> > > > > >> As you mentioned in the KIP - "when a large number of topics
> > > > > (partitions,
> > > > > >> really) are deleted at once, it can take significant time for
> the
> > > > > >> Controller to process everything.
> > > > > >> In that sense, does it make sense to have the metric expose the
> > > number
> > > > > of
> > > > > >> partitions that are pending deletion, as opposed to topics?
> > Perhaps
> > > > even
> > > > > >> both?
> > > > > >> My reasoning is that this metric alone wouldn't say much if we
> had
> > > one
> > > > > >> topic with 1000 partitions versus a topic with 1 partition
> > > > > >>
> > > > > >> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani <
> > ka...@harsha.io
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> > > > > >> > -Harsha
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur <
> > > > > davidart...@apache.org>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > Hello all, I'd like to start a discussion for
> > > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > > > >> > >
> KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> > > > > >> > >
> > > > > >> > > Thanks!
> > > > > >> > > David
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Best,
> > > > > >> Stanislav
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > David Arthur
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > David Arthur
> > > > >
> > > >
> > >
> > >
> > > --
> > > David Arthur
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>
>
> --
> David Arthur
>


-- 
Best,
Stanislav


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-08 Thread David Arthur
It looks like topicsIneligibleForDeletion is a subset of topicsToBeDeleted
in the controller.

On Thu, Aug 8, 2019 at 11:16 AM Stanislav Kozlovski 
wrote:

> ineligible replicas/topics are not included in the pending metrics, right?
> If so, sounds good to me.
>
> On Thu, Aug 8, 2019 at 4:12 PM David Arthur  wrote:
>
> > Yes I think exposing ineligible topics would be useful as well. The
> > controller also tracks this ineligible state for replicas. Would that be
> > useful to expose as well?
> >
> > In that case, we'd be up to four new metrics:
> > * topics pending delete
> > * replicas pending delete
> > * ineligible topics
> > * ineligible replicas
> >
> > Thoughts?
> >
> >
> > On Wed, Aug 7, 2019 at 5:16 PM Jason Gustafson 
> wrote:
> >
> > > Thanks for the KIP. This is useful. The controller also maintains a set
> > for
> > > topics which are awaiting deletion, but currently ineligible. A topic
> > which
> > > is undergoing reassignment, for example, is ineligible for deletion.
> > Would
> > > it make sense to have a metric for this as well?
> > >
> > > -Jason
> > >
> > > On Wed, Aug 7, 2019 at 1:52 PM David Arthur  wrote:
> > >
> > > > Updated the KIP with a count of replicas awaiting deletion.
> > > >
> > > > On Wed, Aug 7, 2019 at 9:37 AM David Arthur 
> wrote:
> > > >
> > > > > Thanks for the feedback, Stan. That's a good point about the
> > partition
> > > > > count -- I'll poke around and see if I can surface this value in
> the
> > > > > Controller.
> > > > >
> > > > > On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski <
> > > > stanis...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> Thanks for the KIP David,
> > > > >>
> > > > >> As you mentioned in the KIP - "when a large number of topics
> > > > (partitions,
> > > > >> really) are deleted at once, it can take significant time for the
> > > > >> Controller to process everything.
> > > > >> In that sense, does it make sense to have the metric expose the
> > number
> > > > of
> > > > >> partitions that are pending deletion, as opposed to topics?
> Perhaps
> > > even
> > > > >> both?
> > > > >> My reasoning is that this metric alone wouldn't say much if we had
> > one
> > > > >> topic with 1000 partitions versus a topic with 1 partition
> > > > >>
> > > > >> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani <
> ka...@harsha.io
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> > > > >> > -Harsha
> > > > >> >
> > > > >> >
> > > > >> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur <
> > > > davidart...@apache.org>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hello all, I'd like to start a discussion for
> > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > > >> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> > > > >> > >
> > > > >> > > Thanks!
> > > > >> > > David
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Best,
> > > > >> Stanislav
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > David Arthur
> > > > >
> > > >
> > > >
> > > > --
> > > > David Arthur
> > > >
> > >
> >
> >
> > --
> > David Arthur
> >
>
>
> --
> Best,
> Stanislav
>


-- 
David Arthur


[jira] [Created] (KAFKA-8770) Either switch to or add an option for emit-on-change

2019-08-08 Thread John Roesler (JIRA)
John Roesler created KAFKA-8770:
---

 Summary: Either switch to or add an option for emit-on-change
 Key: KAFKA-8770
 URL: https://issues.apache.org/jira/browse/KAFKA-8770
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Streams offers two emission models:
* emit-on-window-close: (using Suppression)
* emit-on-update: (i.e., emit a new result whenever a new record is processed, 
regardless of whether the result has changed)

There is also an option to drop some intermediate results, either using caching 
or suppression.

However, there is no support for emit-on-change, in which results would be 
forwarded only if the result has changed. This has been reported to be 
extremely valuable as a performance optimizations for some high-traffic 
applications, and it reduces the computational burden both internally for 
downstream Streams operations, as well as for external systems that consume the 
results, and currently have to deal with a lot of "no-op" changes.

It would be pretty straightforward to implement this, by loading the prior 
results before a stateful operation and comparing with the new result before 
persisting or forwarding. In many cases, we load the prior result anyway, so it 
may not be a significant performance impact either.

One design challenge is what to do with timestamps. If we get one record at 
time 1 that produces a result, and then another at time 2 that produces a 
no-op, what should be the timestamp of the result, 1 or 2? emit-on-change would 
require us to say 1.

Clearly, we'd need to do some serious benchmarks to evaluate any potential 
implementation of emit-on-change.

Another design challenge is to decide if we should just automatically provide 
emit-on-change for stateful operators, or if it should be configurable. 
Configuration increases complexity, so unless the performance impact is high, 
we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8769) Consider computing stream time independently per key

2019-08-08 Thread John Roesler (JIRA)
John Roesler created KAFKA-8769:
---

 Summary: Consider computing stream time independently per key
 Key: KAFKA-8769
 URL: https://issues.apache.org/jira/browse/KAFKA-8769
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Streams uses a concept of "stream time", which is computed as the 
highest timestamp observed by stateful operators, per partition. This concept 
of time backs grace period, retention time, and suppression.

For use cases in which data is produced to topics in roughly chronological 
order (as in db change capture), this reckoning is fine.

Some use cases have a different pattern, though. For example, in IOT 
applications, it's common for sensors to save up quite a bit of data and then 
dump it all at once into the topic. See 
https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
 for a concrete example of the use case.

I have heard of cases where each sensor dumps 24 hours' worth of data at a time 
into the topic. This results in a pattern in which, when reading a single 
partition, the operators observe a lot of consecutive records for one key that 
increase in timestamp for 24 hours, then a bunch of consecutive records for 
another key that are also increasing in timestamp over the same 24 hour period. 
With our current stream-time definition, this means that the partition's stream 
time increases while reading the first key's data, but then stays paused while 
reading the second key's data, since the second batch of records all have 
timestamps in the "past".

E.g:
{noformat}
A@t0 (stream time: 0)
A@t1 (stream time: 1)
A@t2 (stream time: 2)
A@t3 (stream time: 3)
B@t0 (stream time: 3)
B@t1 (stream time: 3)
B@t2 (stream time: 3)
B@t3 (stream time: 3)
{noformat}

This pattern results in an unfortunate compromise in which folks are required 
to set the grace period to the max expected time skew, for example 24 hours, or 
Streams will just drop the second key's data (since it is late). But, this 
means that if they want to use Suppression for "final results", they have to 
wait 24 hours for the result.

This tradeoff is not strictly necessary, though, because each key represents a 
logically independent sequence of events. Tracking by partition is simply 
convenient, but typically not logically meaningful. That is, the partitions are 
just physically independent sequences of events, so it's convenient to track 
stream time at this granularity. It would be just as correct, and more useful 
for IOT-like use cases, to track time independently for each key.

However, before considering this change, we need to solve the 
testing/low-traffic problem. This is the opposite issue, where a partition 
doesn't get enough traffic to advance stream time and results remain "stuck" in 
the suppression buffers. We can provide some mechanism to force the advancement 
of time across all partitions, for use in testing when you want to flush out 
all results, or in production when some topic is low volume. We shouldn't 
consider tracking time _more_ granularly until this problem is solved, since it 
would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-08 Thread Stanislav Kozlovski
ineligible replicas/topics are not included in the pending metrics, right?
If so, sounds good to me.

On Thu, Aug 8, 2019 at 4:12 PM David Arthur  wrote:

> Yes I think exposing ineligible topics would be useful as well. The
> controller also tracks this ineligible state for replicas. Would that be
> useful to expose as well?
>
> In that case, we'd be up to four new metrics:
> * topics pending delete
> * replicas pending delete
> * ineligible topics
> * ineligible replicas
>
> Thoughts?
>
>
> On Wed, Aug 7, 2019 at 5:16 PM Jason Gustafson  wrote:
>
> > Thanks for the KIP. This is useful. The controller also maintains a set
> for
> > topics which are awaiting deletion, but currently ineligible. A topic
> which
> > is undergoing reassignment, for example, is ineligible for deletion.
> Would
> > it make sense to have a metric for this as well?
> >
> > -Jason
> >
> > On Wed, Aug 7, 2019 at 1:52 PM David Arthur  wrote:
> >
> > > Updated the KIP with a count of replicas awaiting deletion.
> > >
> > > On Wed, Aug 7, 2019 at 9:37 AM David Arthur  wrote:
> > >
> > > > Thanks for the feedback, Stan. That's a good point about the
> partition
> > > > count -- I'll poke around and see if I can surface this value in the
> > > > Controller.
> > > >
> > > > On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski <
> > > stanis...@confluent.io>
> > > > wrote:
> > > >
> > > >> Thanks for the KIP David,
> > > >>
> > > >> As you mentioned in the KIP - "when a large number of topics
> > > (partitions,
> > > >> really) are deleted at once, it can take significant time for the
> > > >> Controller to process everything.
> > > >> In that sense, does it make sense to have the metric expose the
> number
> > > of
> > > >> partitions that are pending deletion, as opposed to topics? Perhaps
> > even
> > > >> both?
> > > >> My reasoning is that this metric alone wouldn't say much if we had
> one
> > > >> topic with 1000 partitions versus a topic with 1 partition
> > > >>
> > > >> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani  >
> > > >> wrote:
> > > >>
> > > >> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> > > >> > -Harsha
> > > >> >
> > > >> >
> > > >> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur <
> > > davidart...@apache.org>
> > > >> > wrote:
> > > >> >
> > > >> > > Hello all, I'd like to start a discussion for
> > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > >> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> > > >> > >
> > > >> > > Thanks!
> > > >> > > David
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Best,
> > > >> Stanislav
> > > >>
> > > >
> > > >
> > > > --
> > > > David Arthur
> > > >
> > >
> > >
> > > --
> > > David Arthur
> > >
> >
>
>
> --
> David Arthur
>


-- 
Best,
Stanislav


Re: [DISCUSS] KIP-503: deleted topics metric

2019-08-08 Thread David Arthur
Yes I think exposing ineligible topics would be useful as well. The
controller also tracks this ineligible state for replicas. Would that be
useful to expose as well?

In that case, we'd be up to four new metrics:
* topics pending delete
* replicas pending delete
* ineligible topics
* ineligible replicas

Thoughts?


On Wed, Aug 7, 2019 at 5:16 PM Jason Gustafson  wrote:

> Thanks for the KIP. This is useful. The controller also maintains a set for
> topics which are awaiting deletion, but currently ineligible. A topic which
> is undergoing reassignment, for example, is ineligible for deletion. Would
> it make sense to have a metric for this as well?
>
> -Jason
>
> On Wed, Aug 7, 2019 at 1:52 PM David Arthur  wrote:
>
> > Updated the KIP with a count of replicas awaiting deletion.
> >
> > On Wed, Aug 7, 2019 at 9:37 AM David Arthur  wrote:
> >
> > > Thanks for the feedback, Stan. That's a good point about the partition
> > > count -- I'll poke around and see if I can surface this value in the
> > > Controller.
> > >
> > > On Tue, Aug 6, 2019 at 8:13 AM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > > wrote:
> > >
> > >> Thanks for the KIP David,
> > >>
> > >> As you mentioned in the KIP - "when a large number of topics
> > (partitions,
> > >> really) are deleted at once, it can take significant time for the
> > >> Controller to process everything.
> > >> In that sense, does it make sense to have the metric expose the number
> > of
> > >> partitions that are pending deletion, as opposed to topics? Perhaps
> even
> > >> both?
> > >> My reasoning is that this metric alone wouldn't say much if we had one
> > >> topic with 1000 partitions versus a topic with 1 partition
> > >>
> > >> On Mon, Aug 5, 2019 at 8:19 PM Harsha Chintalapani 
> > >> wrote:
> > >>
> > >> > Thanks for the KIP.  Its useful metric to have.  LGTM.
> > >> > -Harsha
> > >> >
> > >> >
> > >> > On Mon, Aug 05, 2019 at 11:24 AM, David Arthur <
> > davidart...@apache.org>
> > >> > wrote:
> > >> >
> > >> > > Hello all, I'd like to start a discussion for
> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > >> > > KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion
> > >> > >
> > >> > > Thanks!
> > >> > > David
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >> Best,
> > >> Stanislav
> > >>
> > >
> > >
> > > --
> > > David Arthur
> > >
> >
> >
> > --
> > David Arthur
> >
>


-- 
David Arthur


[jira] [Resolved] (KAFKA-8578) Add Functionality to Expose RocksDB Metrics

2019-08-08 Thread Bruno Cadonna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8578.
--
Resolution: Done

> Add Functionality to Expose RocksDB Metrics
> ---
>
> Key: KAFKA-8578
> URL: https://issues.apache.org/jira/browse/KAFKA-8578
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.4.0
>
>
> To expose RocksDB metrics as specified in KIP-471, functionality to create 
> and record metrics in the Kafka metrics registry is required. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Bruno Cadonna
Hi,

Thank you for the KIP!

Some questions/comments:

1. I am wondering if the "stand-by" tasks that catch up state before
the active task is switched deserve its own name in this KIP and maybe
in the code. We have already stated that they are not true stand-by
tasks, they are not configured through `num.standby.replicas`, and
maybe they have also other properties that distinguish them from true
stand-by tasks of which we are not aware yet. For example, they may be
prioritized differently than other tasks. Furthermore, the name
"stand-by" does not really fit with the planned functionality of those
tasks. In the following, I will call them false stand-by tasks.

2. Did you consider to trigger the probing rebalances not at regular
time intervals but when the false stand-by tasks reach an acceptable
lag? If you did consider, could you add a paragraph why you rejected
this idea to the "Rejected Alternatives" section.

3. Are tasks that solely contain stores with disabled logging
classified as stateful or stateless in the algorithm? I would guess
stateless, although if possible they should be assigned to the same
instance they had run before the rebalance. As far as I can see this
special case is not handled in the algorithm.

Best,
Bruno



On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang  wrote:
>
> 1. Sounds good, just wanted to clarify; and it may worth documenting it so
> that users would not be surprised when monitoring their footprint.
>
> 2. Hmm I see... I think the trade-off can be described as "how much
> imbalance would bother you to be willing to pay another rebalance, along
> with potentially more restoration lag", and the current definition of
> rebalance_factor can be considered as a rough measurement of that
> imbalance. Of course one can argue that a finer grained measurement could
> be "resource footprint" like CPU / storage of each instance like we have in
> Kafka broker auto balancing tools, but I'd prefer not doing that as part of
> the library but more as an operational tool in the future. On the other
> hand, I've seen stateful and stateless tasks having very different load,
> and sometimes the only bottleneck of a Streams app is just one stateful
> sub-topology and whoever gets tasks of that sub-topology become hotspot
> (and that's why our algorithm tries to balance per sub-topology as well),
> so maybe we can just consider stateful tasks when calculating this factor
> as a very brute force heuristic?
>
> 3.a. Thinking about this a bit more, maybe it's better not try to tackle an
> unseen enemy just yet, and observe if it really emerges later, and by then
> we may have other ways to not starving the standby tasks, for example, by
> using dedicate threads for standby tasks or even consider having higher
> priority for standby than active so that we always try to caught up standby
> first, then process active; and if active's lagging compared to
> log-end-offset is increasing then we should increase capacity, etc etc.
>
> 4. Actually with KIP-429 this may not be the case: we may not call
> onPartitionsRevoked prior to rebalance any more so would not transit state
> to PARTITIONS_REVOKED, and hence not cause the state of the instance to be
> REBALANCING. In other words, even if a instance is undergoing a rebalance
> it's state may still be RUNNING and it may still be processing records at
> the same time.
>
>
> On Wed, Aug 7, 2019 at 12:14 PM John Roesler  wrote:
>
> > Hey Guozhang,
> >
> > Thanks for the review!
> >
> > 1. Yes, even with `num.standby.replicas := 0`, we will still temporarily
> > allocate standby tasks to accomplish a no-downtime task migration.
> > Although, I'd argue that this doesn't really violate the config, as the
> > task isn't a true hot standby. As soon as it catches up, we'll rebalance
> > again, that task will become active, and the original instance that hosted
> > the active task will no longer have the task assigned at all. Once the
> > stateDirCleaner kicks in, we'll free the disk space from it, and return to
> > the steady-state of having just one copy of the task in the cluster.
> >
> > We can of course do without this, but I feel the current proposal is
> > operationally preferable, since it doesn't make configuring hot-standbys a
> > pre-requisite for fast rebalances.
> >
> > 2. Yes, I think your interpretation is what we intended. The default
> > balance_factor would be 1, as it is implicitly today. What this does is
> > allows operators to trade off less balanced assignments against fewer
> > rebalances. If you have lots of space capacity in your instances, this may
> > be a perfectly fine tradeoff, and you may prefer for Streams not to bother
> > streaming GBs of data from the broker in pursuit of perfect balance. Not
> > married to this configuration, though. It was inspired by the related work
> > research we did.
> >
> > 3. I'll take a look
> >
> > 3a. I think this is a good idea. I'd classify it as a type of grey failure
> > detection. It may make 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Harsha Chintalapani
In your KIP you added security. provider as rejected alternative and
specified "its not the correct way". Do you mind explaining why its not? I
didn't find any evidence in Java docs to say so. Contrary to your statement
it does say in the java docs
" However, please note that a provider can be used to implement any
security service in Java that uses a pluggable architecture with a choice
of implementations that fit underneath."

Java Security Providers have been used by other projects to provide such
integration . I am not sure if you looked into Spiffe project to
efficiently distribute certificates but here is an example of Java provider
https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java
which
obtains certificates from local daemons.
These integrations are being used in Tomcat, Jetty etc..  We are also using
Security provider to do the same in our Kafka clusters. So unless I see
more evidence why security.provider doesn't work for you
adding new interfaces while there exists more cleaner way of  achieving the
goals of this KIP  is unnecessary and breaks the well known security
interfaces provided by Java itself.

Thanks,
Harsha


On Thu, Aug 08, 2019 at 6:54 AM, Harsha Chintalapani 
wrote:

> Hi Maulin,
>Not sure if you looked at my previous replies. This changes
> are not required as there is already security Provider to do what you are
> proposing.  This KIP https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config also
> addresses easy registration of such providers.
>
> Thanks,
> Harsha
>
>
> On Wed, Aug 07, 2019 at 11:31 PM, Maulin Vasavada  com> wrote:
>
> Bump! Can somebody please review this?
>
> On Tue, Jul 16, 2019 at 1:51 PM Maulin Vasavada 
> wrote:
>
> Bump! Can somebody please review this?
>
>


Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Harsha Chintalapani
Hi Maulin,
   Not sure if you looked at my previous replies. This changes
are not required as there is already security Provider to do what you are
proposing.  This KIP
https://cwiki.apache.org/confluence/display/KAFKA/KIP-492%3A+Add+java+security+providers+in+Kafka+Security+config
also
addresses easy registration of such providers.

Thanks,
Harsha


On Wed, Aug 07, 2019 at 11:31 PM, Maulin Vasavada  wrote:

> Bump! Can somebody please review this?
>
> On Tue, Jul 16, 2019 at 1:51 PM Maulin Vasavada 
> wrote:
>
> Bump! Can somebody please review this?
>
>


Re: Alternative of poll(0) without pulling records

2019-08-08 Thread Viktor Somogyi-Vass
Hey Jungtaek,

Thanks for your interest, sometimes I also think such an API would be a
good thing.
I don't see any strong reasons neither in KIP-288 nor in KIP-266 why such
an API shouldn't be created, so go ahead with it, although you'll need to
create a short KIP for this as the KafkaConsumer class considered to be a
public API.

Best,
Viktor

On Wed, Aug 7, 2019 at 9:26 AM Jungtaek Lim  wrote:

> If we just wanted to remove deprecation and let both co-exist, that would
> be also viable, though `poll(0)` is still a hack and it would be ideal to
> provide official approach to do so.
>
> On Wed, Aug 7, 2019 at 4:24 PM Jungtaek Lim  wrote:
>
> > Hi devs,
> >
> > I'm trying to replace deprecated poll(long) with poll(Duration), and
> > realized there's no alternative which behaves exactly same as poll(0), as
> > poll(0) has been used as a hack to only update metadata instead of
> pulling
> > records. poll(Duration.ZERO) wouldn't behave same since even updating
> > metadata will be timed-out. So now end users would need to give more
> > timeout and even pull some records even they're only interested in
> metadata.
> >
> > I looked back some KIPs which brought the change, and "discarded" KIP
> > (KIP-288 [1]) actually proposed a new API which only pulls metadata.
> > KIP-266 [2] is picked up instead but it didn't cover all the things what
> > KIP-288 proposed. I'm seeing some doc explaining poll(0) hasn't been
> > supported officially, but the hack has been widely used and they can't be
> > ignored.
> >
> > Kafka test code itself relies on either deprecated poll(0),
> > or updateAssignmentMetadataIfNeeded, which seems to be private API only
> for
> > testing.
> > (Btw, I'd try out replacing poll(0) to updateAssignmentMetadataIfNeeded
> as
> > avoiding deprecated method - if it works I'll submit a PR.)
> >
> > I'm feeling that it would be ideal to expose
> > `updateAssignmentMetadataIfNeeded` to the public API, maybe with renaming
> > as `waitForAssignment` which was proposed in KIP-288 if it feels too
> long.
> >
> > What do you think? If it sounds feasible I'd like to try out contribution
> > on this. I'm new to contribute Kafka community, so not sure it would
> > require a new KIP or not.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1.
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method
> > 2.
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior
> >
> >
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-08 Thread Satish Duggana
Hi Rajini,
Sure, I will start a discussion thread soon on dev mailing list.

Thanks,
Satish.

On Thu, Aug 8, 2019 at 1:29 AM Rajini Sivaram  wrote:
>
> Hi Ron/Harsha/Satish,
>
> Thanks for reviewing the KIP!
>
> We should perhaps have a wider discussion outside this KIP for refactoring
> clients so that others who are not following this KIP also notice the
> discussion. Satish, would you like to start a discussion thread on dev?
>
> Regards,
>
> Rajini
>
>
> On Wed, Aug 7, 2019 at 6:21 PM Satish Duggana 
> wrote:
>
> > I felt the same need when we want to add a pluggable API for core
> > server functionality. This does not need to be part of this KIP, it
> > can be a separate KIP. I can contribute those refactoring changes if
> > others are OK with that.
> >
> > It is better to have a structure like below.
> >
> > kafka-common:
> > common classes which can be used in any of the other modules in Kafka
> > like client, Kafka-server-common and server etc.
> >
> > kafka-client-common:
> > common classes which can be used in the client module. This can be
> > part of client module itself.
> >
> > kafka-server-common:
> > classes required only for kafka-server.
> >
> > Thanks.
> > Satish.
> >
> > On Wed, Aug 7, 2019 at 9:28 PM Harsha Chintalapani 
> > wrote:
> > >
> > > Thanks for the KIP Rajini.
> > > Quick thought, it would be good to have a common module outside of
> > clients
> > > that only applies to server side interfaces & changes. It looks like we
> > are
> > > increasingly in favor of using Java interface for pluggable modules  on
> > the
> > > broker side.
> > >
> > > Thanks,
> > > Harsha
> > >
> > >
> > > On Tue, Aug 06, 2019 at 2:31 PM, Rajini Sivaram  > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have created a KIP to replace the Scala Authorizer API with a new
> > Java
> > > > API:
> > > >
> > > > -
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > > KIP-504+-+Add+new+Java+Authorizer+Interface
> > > >
> > > > This is replacement for KIP-50 which was accepted but never merged.
> > Apart
> > > > from moving to a Java API consistent with other pluggable interfaces
> > in the
> > > > broker, KIP-504 also attempts to address known limitations in the
> > > > authorizer. If you have come across other limitations that you would
> > like
> > > > to see addressed in the new API, please raise these on the discussion
> > > > thread so that we can consider those too. All suggestions and feedback
> > are
> > > > welcome.
> > > >
> > > > Thank you,
> > > >
> > > > Rajini
> > > >
> >


[jira] [Created] (KAFKA-8768) Replace DeleteRecords request/response with automated protocol

2019-08-08 Thread Mickael Maison (JIRA)
Mickael Maison created KAFKA-8768:
-

 Summary: Replace DeleteRecords request/response with automated 
protocol
 Key: KAFKA-8768
 URL: https://issues.apache.org/jira/browse/KAFKA-8768
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison
Assignee: Mickael Maison






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-08 Thread Rajini Sivaram
Hi Don,

Thanks for reviewing the KIP.

1. I had this originally as a single Action, but thought it may be useful
to support batched authorize calls as well and keep it consistent with
other methods. Single requests can contain multiple topics. For example a
produce request can contain records for several partitions of different
topics. Broker could potentially authorize these together. For
SimpleAclAuthorizer, batched authorize methods don't provide any
optimisation since lookup is based on resources followed by the matching
logic. But some authorizers may manage ACLs by user principal rather than
resource and may be able to optimize batched requests. I am ok with using
single Action if this is likely to cause issues.
2. If you have two listeners, one for inter-broker traffic and another for
external clients, start method is invoked twice, once for each listener. On
second thought, that may be confusing and a single start() invocation that
provides all listener information and returns multiple futures would be
better. Will update the KIP.
3. A typical example is a consumer subscribing to a regex pattern. We
request all topic metadata from the broker in order to decide whether the
pattern matches, expecting to receive a list of authorised topics. The user
is not asking to subscribe to an unauthorized topic. If there are 1
topics in the cluster and the user has access to 100 of them, at the moment
we log 9900 DENIED log entries at INFO level in SimpleAclAuthorizer. The
proposal is to authorize this request with AuthorizationMode.FILTER, so
that authorizers can log resources that are filtered out at lower level
like DEBUG since this is not an attempt to access unauthorized resources.
Brokers already handle these differently since no authorization error is
returned to the client in these cases. Providing authorization mode to
authorizers enables authorizer implementations to generate better audit
logs.
4. Each request may contain multiple instances of the same authorizable
resource. For example a produce request may contain records for 10
partitions of the same topic. At the moment, we invoke authorize method 10
times. The proposal is to invoke it once with count=10. The count is
provided to authorizer just for audit logging purposes.
5. Authorizer implements Closeable, so you could use close() to flush
audits?

On Thu, Aug 8, 2019 at 7:01 AM Don Bosco Durai  wrote:

> Rajini
>
> Thanks for putting this together. It is looking good. I have few
> questions...
>
> 1. List authorize(..., List actions).  Do you
> see a scenario where the broker will call authorize for multiple topics at
> the same time? I can understand that during creating/deleting ACLS,
> multiple permissions for multiple resources might be done. For authorize
> call, would this be a case? And does the Authorize implementation will be
> able to do performance optimization because of this? Or should we just keep
> it simple? I don't see it as an issue from Apache Ranger side, but just
> checking to see whether we need to be aware of something.
> 2. Should I assume that the SecurityProtocol passed during start and the
> one return by KafkaRequestContext.securityProtocol() will be the same?
> CompletableFuture start(String listenerName, SecurityProtocol
> securityProtocol);
> KafkaRequestContext.securityProtocol()
> 3. What is the purpose of AuthorizationMode? How does the broker decide
> what mode to use when the authorize() method is called?
> 4. Can we clarify "count" in Action a bit more? How is it used?
> 5. Do you feel having "stop" along with "start" be helpful? E.g. In Ranger
> we try to optimize the Audit writing by caching the logs for a fixed
> interval. But when the Broker terminates, we do a forced flush. Having an
> explicit "stop" might give us a formal way to flush our audits.
>
> Thanks
>
> Bosco
>
> On 8/7/19, 3:59 PM, "Rajini Sivaram"  wrote:
>
> Hi Ron/Harsha/Satish,
>
> Thanks for reviewing the KIP!
>
> We should perhaps have a wider discussion outside this KIP for
> refactoring
> clients so that others who are not following this KIP also notice the
> discussion. Satish, would you like to start a discussion thread on dev?
>
> Regards,
>
> Rajini
>
>
> On Wed, Aug 7, 2019 at 6:21 PM Satish Duggana <
> satish.dugg...@gmail.com>
> wrote:
>
> > I felt the same need when we want to add a pluggable API for core
> > server functionality. This does not need to be part of this KIP, it
> > can be a separate KIP. I can contribute those refactoring changes if
> > others are OK with that.
> >
> > It is better to have a structure like below.
> >
> > kafka-common:
> > common classes which can be used in any of the other modules in Kafka
> > like client, Kafka-server-common and server etc.
> >
> > kafka-client-common:
> > common classes which can be used in the client module. This can be
> > part of client module itself.
> >
> > 

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-08-08 Thread M. Manna
Hi,

If I may, perhaps you could simplify everything by using only
'auto.create.topics.enable' as a value along with true. In other words, the
public interfaces section should only have [true,auto.create.topics.enable,
false].

The reason for this is that auto.create.topics.enable is already known to
users as a "Server-SIde" config. So all you are saying is

a) To avoid day 1 impact, it will follow whatever auto.create.topics.enable
value is set.
b) False means - no client side topic creation
c) True means client side topic creation.

It saves creating 2 more new strings :). But not too expensive anyway.

Also, when you deprecate auto.create.topics.enable - you must provide
sufficient logic to ensure that things like rolling upgrade doesn't
temporarily break anything. I apologise if you have already accounted for
this, but wanted to mention since I didn't notice this on the KIP.

Let me know how this sounds.

Regards,

On Wed, 7 Aug 2019 at 19:10, Justine Olshan  wrote:

> Hi Harsha,
>
> I think my message may have gotten lost in all the others.
>
> Two of the goals of this KIP are to 1) allow auto-creation on specific
> clients when the broker default is false and 2) eventually replace the
> broker config.
>
> In order to accomplish these two goals, we need the producer to be able to
> create topics despite the broker config. (How can we replace a function
> when we rely on it?)
> I think at this point we have a fundamental disagreement in what we should
> allow the producer to do.
> In my previous message I mentioned a config that would allow for the broker
> to prevent producer auto-creation. (It would be disabled by default.) It
> would fix your issue for now, but could lead to more complications later.
>
> Thank you,
> Justine
>
>
> On Wed, Aug 7, 2019 at 10:56 AM Harsha Chintalapani 
> wrote:
>
> > On Wed, Aug 07, 2019 at 9:50 AM, Colin McCabe  wrote:
> >
> > > On Wed, Aug 7, 2019, at 09:24, Harsha Ch wrote:
> > >
> > > On Tue, Aug 06, 2019 at 11:46 PM, Colin McCabe < cmcc...@apache.org >
> > > wrote:
> > >
> > > On Tue, Aug 6, 2019, at 21:38, Harsha Ch wrote:
> > >
> > > Hi Colin,
> > > "Hmm... I'm not sure I follow. Users don't have to build their own
> > > tooling, right? They can use any of the shell scripts that we've
> shipped
> > in
> > > the last few releases. For example, if any of your users run it, this
> > shell
> > > script will delete all of the topics from your non-security-enabled
> > > cluster:
> > >
> > > ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > > --bootstrap-server localhost:9092 --list 2>/dev/null
> > > | xargs -l ./ bin/ kafka-topics. sh ( http://bin/kafka-topics.sh )
> > > --bootstrap-server localhost:9092 --delete
> > > --topic
> > >
> > > They will need to fill in the correct bootstrap servers list, of
> course,
> > > not localhost. This deletion script will work on some pretty old
> brokers,
> > > even back to the 0.10 releases. It seems a little odd to trust your
> users
> > > with this power, but not trust them to avoid changing a particular
> > > configuration key."
> > >
> > > The above will blocked by the server if we set delete.topic.enable to
> > > false and thats exactly what I am asking for.
> > >
> > > Hi Harsha,
> > >
> > > I was wondering if someone was going to bring up that configuration :)
> > >
> > > it's an interesting complication, but globally disabling topic deletion
> > is
> > > not very practical for most use-cases.
> > >
> > > In any case, there are plenty of other bad things that users with full
> > > permissions can do that aren't blocked by any server configuration. For
> > > example, they can delete every record in every topic. I can write a
> > script
> > > for that too, and there's no server configuration you can set to
> disable
> > > it. Or I could simply create hundreds of thousands of topics, until
> > cluster
> > > performance becomes unacceptable (this will be even more of a problem
> if
> > > someone configured delete.topic.enable as false). Or publish bad data
> to
> > > every topic, etc. etc.
> > >
> > > The point I'm trying to make here is that you can't rely on these kind
> of
> > > server-side configurations for security. At most, they're a way to set
> up
> > > certain very simple policies. But the policies are so simple that
> they're
> > > hardly ever useful any more.
> > >
> > > For example, if the problem you want to solve is that you want a user
> to
> > > only be able to create 50 topics and not delete anyone else's topics,
> you
> > > can solve that with a CreateTopicsPolicy that limits the number of
> > topics,
> > > and some ACLs. There's no combination of auto.create.topics.enable and
> > > delete.topic.enable that will help here.
> > >
> > > Hi Colin,
> > >
> > > Well you gave the example that a user can delete the topics
> > > just by running that script  :).
> > >
> > > I understand there are open APIs in Kafka and can lead to rogue clients
> > > taking advantage of it without proper 

Re: [DISCUSS] KIP-317: Transparent Data Encryption

2019-08-08 Thread Sönke Liebau
Thanks for your feedback both of you!

I've commented inline below.


On Thu, 8 Aug 2019 at 08:38, Jörn Franke  wrote:

> If you are doing batch encryption then you are more similar to a scenario
> of file encryption. The more frequent the messages are you are closer to
> the ssl/https scenarios. You may learn from those protocols on how they
> handle keys, how long they keep them etc. to implement your E2e solution .
>
> > Am 08.08.2019 um 08:11 schrieb Maulin Vasavada <
> maulin.vasav...@gmail.com>:
> >
> > Hi Sönke Liebau
> > <
> https://www.mail-archive.com/search?l=dev@kafka.apache.org=from:%22S%C3%B6nke+Liebau%22
> >
> >
> > Thanks for the great detailed documentation. However, I feel by leaving
> the
> > KMS outside of Kafka might simplify the whole thing to a great extent. If
> > the broker is not going to touch the encrypted messages, why would we put
> > any dependency of KMS interfaces on the Broker. We have experimented
> doing
> > end-to-end message encryption and we used topic level keys and message
> > encryption with serializer wrapper which encrypts each message before
> > serializing. The serializer wrapper have to integrate with required KMS
> we
> > use internally and that was all.
>
My idea by having the broker manage topic keys was that we keep the option
of actually making the encryption transparent to the clients. This way you
could configure a topic as encrypted on the broker and the broker would
then push everything to the client that it needs to know to encrypt
messages on startup - but still be unable to decrypt messages itself.

However, this is only one possible scenario. Another valid scenario is of
course that you want to configure clients directly with keys, which I hope
my proposal also covers, as everything is pluggable. And in this case the
broker would not need a dependency on the KMS, as it doesn't need to handle
keys.

Basically by making this pluggable I hope to be able to cover a wide
variety of use cases, the two described in the KIP are just the ones that
I'd implement initially.



> >
> > However one key observation we had was - if we could do encryption at
> > 'batch' level instead of 'per-message' it can perform much better
> > (depending upon batch sizing). We didn't experiment with that though.
>

I agree, batch encryption would make this perform much better, but it has
downsides as well. I am unsure of the security implications of larger vs
smaller payload to be honest, but will investigate this.
In addition however, we do not want to decrypt the batch on the broker, so
this will be handed to consumers as a batch as well, which has the same
implications as end-to-end compression like more complicated offset
committing for consumers. I have not looked into that in a long time and
that may not even be an issue anymore. I'll do some digging here as well.
Bottom line: I agree, but I think we should offer both modes of operation.


> >
> > Thanks
> > Maulin
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Re: [DISCUSS] KIP-317: Transparent Data Encryption

2019-08-08 Thread Jörn Franke
If you are doing batch encryption then you are more similar to a scenario of 
file encryption. The more frequent the messages are you are closer to the 
ssl/https scenarios. You may learn from those protocols on how they handle 
keys, how long they keep them etc. to implement your E2e solution .

> Am 08.08.2019 um 08:11 schrieb Maulin Vasavada :
> 
> Hi Sönke Liebau
> 
> 
> Thanks for the great detailed documentation. However, I feel by leaving the
> KMS outside of Kafka might simplify the whole thing to a great extent. If
> the broker is not going to touch the encrypted messages, why would we put
> any dependency of KMS interfaces on the Broker. We have experimented doing
> end-to-end message encryption and we used topic level keys and message
> encryption with serializer wrapper which encrypts each message before
> serializing. The serializer wrapper have to integrate with required KMS we
> use internally and that was all.
> 
> However one key observation we had was - if we could do encryption at
> 'batch' level instead of 'per-message' it can perform much better
> (depending upon batch sizing). We didn't experiment with that though.
> 
> Thanks
> Maulin


Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-08-08 Thread Maulin Vasavada
Bump! Can somebody please review this?

On Tue, Jul 16, 2019 at 1:51 PM Maulin Vasavada 
wrote:

> Bump! Can somebody please review this?
>


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-08-08 Thread Guozhang Wang
1. Sounds good, just wanted to clarify; and it may worth documenting it so
that users would not be surprised when monitoring their footprint.

2. Hmm I see... I think the trade-off can be described as "how much
imbalance would bother you to be willing to pay another rebalance, along
with potentially more restoration lag", and the current definition of
rebalance_factor can be considered as a rough measurement of that
imbalance. Of course one can argue that a finer grained measurement could
be "resource footprint" like CPU / storage of each instance like we have in
Kafka broker auto balancing tools, but I'd prefer not doing that as part of
the library but more as an operational tool in the future. On the other
hand, I've seen stateful and stateless tasks having very different load,
and sometimes the only bottleneck of a Streams app is just one stateful
sub-topology and whoever gets tasks of that sub-topology become hotspot
(and that's why our algorithm tries to balance per sub-topology as well),
so maybe we can just consider stateful tasks when calculating this factor
as a very brute force heuristic?

3.a. Thinking about this a bit more, maybe it's better not try to tackle an
unseen enemy just yet, and observe if it really emerges later, and by then
we may have other ways to not starving the standby tasks, for example, by
using dedicate threads for standby tasks or even consider having higher
priority for standby than active so that we always try to caught up standby
first, then process active; and if active's lagging compared to
log-end-offset is increasing then we should increase capacity, etc etc.

4. Actually with KIP-429 this may not be the case: we may not call
onPartitionsRevoked prior to rebalance any more so would not transit state
to PARTITIONS_REVOKED, and hence not cause the state of the instance to be
REBALANCING. In other words, even if a instance is undergoing a rebalance
it's state may still be RUNNING and it may still be processing records at
the same time.


On Wed, Aug 7, 2019 at 12:14 PM John Roesler  wrote:

> Hey Guozhang,
>
> Thanks for the review!
>
> 1. Yes, even with `num.standby.replicas := 0`, we will still temporarily
> allocate standby tasks to accomplish a no-downtime task migration.
> Although, I'd argue that this doesn't really violate the config, as the
> task isn't a true hot standby. As soon as it catches up, we'll rebalance
> again, that task will become active, and the original instance that hosted
> the active task will no longer have the task assigned at all. Once the
> stateDirCleaner kicks in, we'll free the disk space from it, and return to
> the steady-state of having just one copy of the task in the cluster.
>
> We can of course do without this, but I feel the current proposal is
> operationally preferable, since it doesn't make configuring hot-standbys a
> pre-requisite for fast rebalances.
>
> 2. Yes, I think your interpretation is what we intended. The default
> balance_factor would be 1, as it is implicitly today. What this does is
> allows operators to trade off less balanced assignments against fewer
> rebalances. If you have lots of space capacity in your instances, this may
> be a perfectly fine tradeoff, and you may prefer for Streams not to bother
> streaming GBs of data from the broker in pursuit of perfect balance. Not
> married to this configuration, though. It was inspired by the related work
> research we did.
>
> 3. I'll take a look
>
> 3a. I think this is a good idea. I'd classify it as a type of grey failure
> detection. It may make more sense to tackle grey failures as part of the
> heartbeat protocol (as I POCed here:
> https://github.com/apache/kafka/pull/7096/files). WDYT?
>
> 4. Good catch! I didn't think about that before. Looking at it now, though,
> I wonder if we're actually protected already. The stateDirCleaner thread
> only executes if the instance is in RUNNING state, and KIP-441 proposes to
> use "probing rebalances" to report task lag. Hence, during the window
> between when the instance reports a lag and the assignor makes a decision
> about it, the instance should remain in REBALANCING state, right? If so,
> then this should prevent the race condition. If not, then we do indeed need
> to do something about it.
>
> 5. Good idea. I think that today, you can only see the consumer lag, which
> is a poor substitute. I'll add some metrics to the proposal.
>
> Thanks again for the comments!
> -John
>
> On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang  wrote:
>
> > Hello Sophie,
> >
> > Thanks for the proposed KIP. I left some comments on the wiki itself,
> and I
> > think I'm still not very clear on a couple or those:
> >
> > 1. With this proposal, does that mean with num.standby.replicas == 0, we
> > may sometimes still have some standby tasks which may violate the config?
> >
> > 2. I think I understand the rationale to consider lags that is below the
> > specified threshold to be equal, rather than still considering 5000 is
> > better 

Re: [DISCUSS] KIP-317: Transparent Data Encryption

2019-08-08 Thread Maulin Vasavada
Hi Sönke Liebau


Thanks for the great detailed documentation. However, I feel by leaving the
KMS outside of Kafka might simplify the whole thing to a great extent. If
the broker is not going to touch the encrypted messages, why would we put
any dependency of KMS interfaces on the Broker. We have experimented doing
end-to-end message encryption and we used topic level keys and message
encryption with serializer wrapper which encrypts each message before
serializing. The serializer wrapper have to integrate with required KMS we
use internally and that was all.

However one key observation we had was - if we could do encryption at
'batch' level instead of 'per-message' it can perform much better
(depending upon batch sizing). We didn't experiment with that though.

Thanks
Maulin


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-08-08 Thread Don Bosco Durai
Rajini

Thanks for putting this together. It is looking good. I have few questions...

1. List authorize(..., List actions).  Do you see 
a scenario where the broker will call authorize for multiple topics at the same 
time? I can understand that during creating/deleting ACLS, multiple permissions 
for multiple resources might be done. For authorize call, would this be a case? 
And does the Authorize implementation will be able to do performance 
optimization because of this? Or should we just keep it simple? I don't see it 
as an issue from Apache Ranger side, but just checking to see whether we need 
to be aware of something.
2. Should I assume that the SecurityProtocol passed during start and the one 
return by KafkaRequestContext.securityProtocol() will be the same?
CompletableFuture start(String listenerName, SecurityProtocol 
securityProtocol);
KafkaRequestContext.securityProtocol()
3. What is the purpose of AuthorizationMode? How does the broker decide what 
mode to use when the authorize() method is called?
4. Can we clarify "count" in Action a bit more? How is it used?
5. Do you feel having "stop" along with "start" be helpful? E.g. In Ranger we 
try to optimize the Audit writing by caching the logs for a fixed interval. But 
when the Broker terminates, we do a forced flush. Having an explicit "stop" 
might give us a formal way to flush our audits.

Thanks

Bosco

On 8/7/19, 3:59 PM, "Rajini Sivaram"  wrote:

Hi Ron/Harsha/Satish,

Thanks for reviewing the KIP!

We should perhaps have a wider discussion outside this KIP for refactoring
clients so that others who are not following this KIP also notice the
discussion. Satish, would you like to start a discussion thread on dev?

Regards,

Rajini


On Wed, Aug 7, 2019 at 6:21 PM Satish Duggana 
wrote:

> I felt the same need when we want to add a pluggable API for core
> server functionality. This does not need to be part of this KIP, it
> can be a separate KIP. I can contribute those refactoring changes if
> others are OK with that.
>
> It is better to have a structure like below.
>
> kafka-common:
> common classes which can be used in any of the other modules in Kafka
> like client, Kafka-server-common and server etc.
>
> kafka-client-common:
> common classes which can be used in the client module. This can be
> part of client module itself.
>
> kafka-server-common:
> classes required only for kafka-server.
>
> Thanks.
> Satish.
>
> On Wed, Aug 7, 2019 at 9:28 PM Harsha Chintalapani 
> wrote:
> >
> > Thanks for the KIP Rajini.
> > Quick thought, it would be good to have a common module outside of
> clients
> > that only applies to server side interfaces & changes. It looks like we
> are
> > increasingly in favor of using Java interface for pluggable modules  on
> the
> > broker side.
> >
> > Thanks,
> > Harsha
> >
> >
> > On Tue, Aug 06, 2019 at 2:31 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I have created a KIP to replace the Scala Authorizer API with a new
> Java
> > > API:
> > >
> > > -
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > KIP-504+-+Add+new+Java+Authorizer+Interface
> > >
> > > This is replacement for KIP-50 which was accepted but never merged.
> Apart
> > > from moving to a Java API consistent with other pluggable interfaces
> in the
> > > broker, KIP-504 also attempts to address known limitations in the
> > > authorizer. If you have come across other limitations that you would
> like
> > > to see addressed in the new API, please raise these on the discussion
> > > thread so that we can consider those too. All suggestions and feedback
> are
> > > welcome.
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
>