Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-11-18 Thread Guozhang Wang
Hello Almog,

I left a comment in the PR before I got to read the newest updates
from this thread. My 2c:

1. I liked the idea of delaying the instantiation of StoreBuiler from
suppliers after the Topology is created. It has been a bit annoying
for many other features we were trying back then. The only thing is,
we need to check when we call Topology.describe() which gets a
TopologyDescription, does that reveal anything about the source of
truth store impl types already; if it does not, then we are good to
go.

2. I originally thought (and commented in the PR) that maybe we can
just add this new func "resolveDslStoreSuppliers" into StreamsConfig
directly and mark it as EVOLVING, because I was not clear that we are
trying to do 1) above. Now I'm leaning more towards what you proposed.
But I still have a question in mind: even after we've done
https://github.com/apache/kafka/pull/14548 later, don't we still need
some interface that user's can call to get the actual instantiated
store supplier for cases where some external custom logic, like an
external controller / scheduler which is developed by a different
group of people rather than the Streams app developers themselves,
that can only turn on certain features after learning the actual store
impl suppliers used?

Guozhang

On Sat, Nov 18, 2023 at 2:46 PM Almog Gavra  wrote:
>
> Hello Everyone - one more minor change to the KIP that came up during
> implementation (reflected in the KIP itself). I will be adding the method
> below to TopologyConfig. This allows us to determine whether or not the
> DslStoreSuppliers was explicitly passed in via either
> DSL_STORE_SUPPLIERS_CLASS_CONFIG or DEFAULT_DSL_STORE_CONFIG (if it was not
> explicitly passed in, we will use the one that is configured in
> StreamsConfig, or the default value of RocksDBDslStoreSuppliers).
>
> See the discussion on the PR (
> https://github.com/apache/kafka/pull/14648#discussion_r1394939779) for more
> context. Ideally this would be an internal utility method but there's no
> clean way to get that done now, so the goal was to minimize the surface
> area of what's being exposed (as opposed to exposing a generic method like
> isOverridden(String config).
>
> /**
>  * @return the DslStoreSuppliers if the value was explicitly configured
> (either by
>  * {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link
> StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
>  */
> public Optional resolveDslStoreSuppliers();
>
> On Fri, Nov 3, 2023 at 10:48 AM Matthias J. Sax  wrote:
>
> > Thanks. Will take a look into the PR.
> >
> > I don't have any objection to the goal; contrary! It's very annoying
> > what we have right now, and if we can improve it, I am totally in favor
> > of it.
> >
> >
> > -Matthias
> >
> > On 11/3/23 8:47 AM, Almog Gavra wrote:
> > > Good question :) I have a PR for it already here:
> > > https://github.com/apache/kafka/pull/14659. The concept is to wrap the
> > > suppliers with an interface that allows for delayed creation of the
> > > StoreBuilder instead of creating the StoreBuilder from the suppliers
> > right
> > > away. Happy to get on a quick call to outline the implementation strategy
> > > if you'd like, but hopefully you have no objections to the goal!
> > >
> > > On Thu, Nov 2, 2023 at 8:44 PM Matthias J. Sax  wrote:
> > >
> > >> Almog,
> > >>
> > >> can you explain how you intent to implement this change? It's not clear
> > >> to me, how we could do this?
> > >>
> > >> When we call `StreasmBuilder.build()` it will give us a already fully
> > >> wired `Topology`, including all store suppliers needed. I don't see a
> > >> clean way how we could change the store supplier after the fact?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 11/2/23 5:11 PM, Almog Gavra wrote:
> > >>> Hello everyone - I updated the KIP to also include the following
> > >>> modification:
> > >>>
> > >>> Both the new dsl.store.suppliers.class  and the old default.dsl.store
> > >> will
> > >>> now respect the configurations when passed in via
> > >> KafkaStreams#new(Topology,
> > >>> StreamsConfig)  (and other related constructors) instead of only being
> > >>> respected when passed in to the initial
> > StoreBuilder#new(TopologyConfig)
> > >>> (though it will be respected if passed in via the original path as
> > well).
> > >>>
> > >>> I was honestly a bit shocked this wasn't the case with the original KIP
> > >>> that introduced default.dsl.store!
> > >>>
> > >>> On Fri, Jul 28, 2023 at 4:55 PM Almog Gavra 
> > >> wrote:
> > >>>
> >  OK! I think I got everything, but I'll give the KIP another read with
> >  fresh eyes later. Just a reminder that the voting is open, so go out
> > and
> >  exercise your civic duty! ;)
> > 
> >  - Almog
> > 
> >  On Fri, Jul 28, 2023 at 10:38 AM Almog Gavra 
> >  wrote:
> > 
> > > Thanks Guozhang & Sophie:
> > >
> > > A2. Will clarify in the KIP
> > > A3. Will change back to the deprecated version!
> > > A5. 

Re: [VOTE] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-18 Thread Guozhang Wang
Thanks Alieh,

I read through the wiki page and the DISCUSS thread, all LGTM except a
minor thing in javadoc:

"The query returns the records with a global ascending order of keys.
The records with the same key are ordered based on their insertion
timestamp in ascending order. Both the global and partial ordering are
modifiable with the corresponding methods defined for the class."

Since this KIP is only for a single key, there's no key ordering but
only timestamp ordering right? Maybe the javadoc can be updated
accordingly.

Otherwise, LGTM.

On Fri, Nov 17, 2023 at 2:36 AM Alieh Saeedi
 wrote:
>
> Hi all,
> Following my recent message in the discussion thread, I am opening the
> voting for KIP-968. Thanks for your votes in advance.
>
> Cheers,
> Alieh


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-18 Thread Guozhang Wang
Hi Rohan,

I took another look at the updated wiki page and do not have any major
questions. Regarding returning a plugin object v.s. configuring a
plugin object, I do not have a strong opinion except that the latter
seems more consistent with existing patterns. Just curious, any other
motivations to go with the latter from you?


Guozhang

On Thu, Nov 9, 2023 at 11:19 PM Rohan Desai  wrote:
>
> Thanks for the feedback so far! I think pretty much all of it is
> reasonable. I'll reply to it inline:
>
> > 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation behind
> it, does our controller also want to change how the partitions->tasks
> mapping is formed?
> You're right that this is out of place. I've removed this method as it's
> not needed by the task assignor.
>
> > 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
> ApplicationMetadata class. If we consider them as some default util
> functions, how about introducing moving those into their own static util
> methods to separate from the ApplicationMetadata “fact objects” ?
> Agreed. Updated in the latest revision of the kip. These have been moved to
> TaskAssignorUtils
>
> > 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users use
> whatever struts / their own classes even, if they like. WDYT?
> Agreed. Updated in the latest version of the kip.
>
> > 1. For the API, thoughts on changing the method signature to return a
> (non-Optional) TaskAssignor? Then we can either have the default
> implementation return new HighAvailabilityTaskAssignor or just have a
> default implementation class that people can extend if they don't want to
> implement every method.
> Based on some other discussion, I actually decided to get rid of the plugin
> interface, and instead use config to specify individual plugin behaviour.
> So the method you're referring to is no longer part of the proposal.
>
> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> theres methods that return void on it? It's not totally clear to me how
> that interface is supposed to be used by the assignor. It'd be nice if we
> could flip that interface such that it becomes part of the output instead
> of an input to the plugin.
> I've moved those methods to a util class. They're really utility methods
> the assignor might want to call to do some default or optimized assignment
> for some cases like rack-awareness.
>
> > 4. We should consider wrapping UUID in a ProcessID class so that we
> control
> the interface (there are a few places where UUID is directly used).
> I like it. Updated the proposal.
>
> > 5. What does NodeState#newAssignmentForNode() do? I thought the point was
> for the plugin to make the assignment? Is that the result of the default
> logic?
> It doesn't need to be part of the interface. I've removed it.
>
> > re 2/6:
>
> I generally agree with these points, but I'd rather hash that out in a PR
> than in the KIP review, as it'll be clearer what gets used how. It seems to
> me (committers please correct me if I'm wrong) that as long as we're on the
> same page about what information the interfaces are returning, that's ok at
> this level of discussion.
>
> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai  wrote:
>
> > Hello All,
> >
> > I'd like to start a discussion on KIP-924 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> > which proposes an interface to allow users to plug into the streams
> > partition assignor. The motivation section in the KIP goes into some more
> > detail on why we think this is a useful addition. Thanks in advance for
> > your feedback!
> >
> > Best Regards,
> >
> > Rohan
> >
> >


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2393

2023-11-18 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 435226 lines...]
Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsForUnassignedTasks() 
STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldNotSetUncaughtExceptionsForUnassignedTasks() 
PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldNotAssignLockedTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldNotAssignLockedTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldUnassignLockingTask() STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldUnassignLockingTask() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeStreamTimePunctuated() 
STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
DefaultTaskManagerTest > shouldAssignTasksThatCanBeStreamTimePunctuated() PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > shouldRecordCorrectBlockCacheUsage(RocksDBStore, 
StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@6b45a4e7, 
org.apache.kafka.test.MockInternalProcessorContext@276618a6 STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > shouldRecordCorrectBlockCacheUsage(RocksDBStore, 
StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@6b45a4e7, 
org.apache.kafka.test.MockInternalProcessorContext@276618a6 PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > shouldRecordCorrectBlockCacheUsage(RocksDBStore, 
StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@19981535, 
org.apache.kafka.test.MockInternalProcessorContext@1e74e03b STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > shouldRecordCorrectBlockCacheUsage(RocksDBStore, 
StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@19981535, 
org.apache.kafka.test.MockInternalProcessorContext@1e74e03b PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore, StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@7f311ad2, 
org.apache.kafka.test.MockInternalProcessorContext@41edc666 STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore, StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@7f311ad2, 
org.apache.kafka.test.MockInternalProcessorContext@41edc666 PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore, StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@7683647a, 
org.apache.kafka.test.MockInternalProcessorContext@55f6f394 STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCachePinnedUsage(RocksDBStore, StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@7683647a, 
org.apache.kafka.test.MockInternalProcessorContext@55f6f394 PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCacheCapacity(RocksDBStore, StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@3a7e8a90, 
org.apache.kafka.test.MockInternalProcessorContext@69877887 STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCacheCapacity(RocksDBStore, StateStoreContext) > [1] 
org.apache.kafka.streams.state.internals.RocksDBStore@3a7e8a90, 
org.apache.kafka.test.MockInternalProcessorContext@69877887 PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCacheCapacity(RocksDBStore, StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@68f6b4b, 
org.apache.kafka.test.MockInternalProcessorContext@7007a62b STARTED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 
RocksDBBlockCacheMetricsTest > 
shouldRecordCorrectBlockCacheCapacity(RocksDBStore, StateStoreContext) > [2] 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore@68f6b4b, 
org.apache.kafka.test.MockInternalProcessorContext@7007a62b PASSED

Gradle Test Run :streams:test > Gradle Test Executor 89 > 

[jira] [Created] (KAFKA-15854) Move Java classes from kafka.server to the server module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15854:
---

 Summary: Move Java classes from kafka.server to the server module
 Key: KAFKA-15854
 URL: https://issues.apache.org/jira/browse/KAFKA-15854
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-11-18 Thread Almog Gavra
Hello Everyone - one more minor change to the KIP that came up during
implementation (reflected in the KIP itself). I will be adding the method
below to TopologyConfig. This allows us to determine whether or not the
DslStoreSuppliers was explicitly passed in via either
DSL_STORE_SUPPLIERS_CLASS_CONFIG or DEFAULT_DSL_STORE_CONFIG (if it was not
explicitly passed in, we will use the one that is configured in
StreamsConfig, or the default value of RocksDBDslStoreSuppliers).

See the discussion on the PR (
https://github.com/apache/kafka/pull/14648#discussion_r1394939779) for more
context. Ideally this would be an internal utility method but there's no
clean way to get that done now, so the goal was to minimize the surface
area of what's being exposed (as opposed to exposing a generic method like
isOverridden(String config).

/**
 * @return the DslStoreSuppliers if the value was explicitly configured
(either by
 * {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link
StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
 */
public Optional resolveDslStoreSuppliers();

On Fri, Nov 3, 2023 at 10:48 AM Matthias J. Sax  wrote:

> Thanks. Will take a look into the PR.
>
> I don't have any objection to the goal; contrary! It's very annoying
> what we have right now, and if we can improve it, I am totally in favor
> of it.
>
>
> -Matthias
>
> On 11/3/23 8:47 AM, Almog Gavra wrote:
> > Good question :) I have a PR for it already here:
> > https://github.com/apache/kafka/pull/14659. The concept is to wrap the
> > suppliers with an interface that allows for delayed creation of the
> > StoreBuilder instead of creating the StoreBuilder from the suppliers
> right
> > away. Happy to get on a quick call to outline the implementation strategy
> > if you'd like, but hopefully you have no objections to the goal!
> >
> > On Thu, Nov 2, 2023 at 8:44 PM Matthias J. Sax  wrote:
> >
> >> Almog,
> >>
> >> can you explain how you intent to implement this change? It's not clear
> >> to me, how we could do this?
> >>
> >> When we call `StreasmBuilder.build()` it will give us a already fully
> >> wired `Topology`, including all store suppliers needed. I don't see a
> >> clean way how we could change the store supplier after the fact?
> >>
> >>
> >> -Matthias
> >>
> >> On 11/2/23 5:11 PM, Almog Gavra wrote:
> >>> Hello everyone - I updated the KIP to also include the following
> >>> modification:
> >>>
> >>> Both the new dsl.store.suppliers.class  and the old default.dsl.store
> >> will
> >>> now respect the configurations when passed in via
> >> KafkaStreams#new(Topology,
> >>> StreamsConfig)  (and other related constructors) instead of only being
> >>> respected when passed in to the initial
> StoreBuilder#new(TopologyConfig)
> >>> (though it will be respected if passed in via the original path as
> well).
> >>>
> >>> I was honestly a bit shocked this wasn't the case with the original KIP
> >>> that introduced default.dsl.store!
> >>>
> >>> On Fri, Jul 28, 2023 at 4:55 PM Almog Gavra 
> >> wrote:
> >>>
>  OK! I think I got everything, but I'll give the KIP another read with
>  fresh eyes later. Just a reminder that the voting is open, so go out
> and
>  exercise your civic duty! ;)
> 
>  - Almog
> 
>  On Fri, Jul 28, 2023 at 10:38 AM Almog Gavra 
>  wrote:
> 
> > Thanks Guozhang & Sophie:
> >
> > A2. Will clarify in the KIP
> > A3. Will change back to the deprecated version!
> > A5. Seems like I'm outnumbered... DslStoreSuppliers it is.
> >
> > Will update the KIP today.
> >
> > - Almog
> >
> > On Thu, Jul 27, 2023 at 12:42 PM Guozhang Wang <
> > guozhang.wang...@gmail.com> wrote:
> >
> >> Yes, that sounds right to me. Thanks Sophie.
> >>
> >> On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
> >>  wrote:
> >>>
> >>> A2: Guozhang, just to close the book on the ListValue store thing,
> I
> >> fully
> >>> agree it seems like overreach
> >>> to expose/force this on users, especially if it's fully internal
> >> today. But
> >>> just to make sure we're on the same
> >>> page here, you're still ok with this KIP fixing the API gap that
> >> exists
> >>> today, in which these stores cannot be
> >>> customized by the user at all?
> >>>
> >>> In other words, after this KIP, the new behavior for the ListValue
> >> store in
> >>> a stream join will be:
> >>>
> >>> S1: First, check if the user passed in a `DSLStoreSuppliers` (or
> >> whatever
> >>> the name will be) to the
> >>>  StreamJoined config object, and use that to obtain the
> >>> KVStoreSupplier for this ListValue store
> >>>
> >>> S2: If none was provided, check if the user has set a default
> >>> DSLStoreSuppliers via the new config,
> >>>  and use that to get the KVStoreSupplier if so
> >>>
> >>> S3: If neither is set, fall back to the original logic as it is
> >> 

[jira] [Created] (KAFKA-15853) Move KafkaConfig to server module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15853:
---

 Summary: Move KafkaConfig to server module
 Key: KAFKA-15853
 URL: https://issues.apache.org/jira/browse/KAFKA-15853
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma


The server module is a Java-only module, so this also requires converting from 
Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15852) Move broker code from `core` to `broker` module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15852:
---

 Summary: Move broker code from `core` to `broker` module
 Key: KAFKA-15852
 URL: https://issues.apache.org/jira/browse/KAFKA-15852
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma


The relevant packages would be `kafka.server`, `kafka.cluster`, etc.

See KAFKA-14524 for more context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1000: List Client Metrics Configuration Resources

2023-11-18 Thread Andrew Schofield
Hi Jun,
This is an example of inconsistency between the tools. I do agree that
it would be nice to have `--list` on the `kafka-client-metrics.sh` tool.
However, `kafka-config.sh` has the convention that `--describe` with no
entity name supplied first gets a list of all the entities, and then describes
them all (ConfigCommand.scala/describeResourceConfig). `kafka-topics.sh`
also describes all topics if you use `—describe` with no topic name.

So, I have added a new `--list` option to `kafka-client-metrics.sh` in this KIP.
The `--describe` option remains as before - if you supply a name, it describes
just that resource, and if you do not, it describes all resources.

Thanks,
Andrew

> On 17 Nov 2023, at 22:13, Jun Rao  wrote:
>
> Hi, Andrew,
>
> Thanks for the reply. KIP-714 proposes to use the following command for
> listing metric subscriptions.
>
> kafka-client-metrics.sh --bootstrap-server $BROKERS --describe
> kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type
> client-metrics
>
> Should we use --list instead to be more consistent with other tools like
> (kafka-topics)?
>
> Thanks,
>
> Jun
>
> On Thu, Nov 16, 2023 at 10:23 AM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
>
>> Hi Jun,
>> KIP-714 includes `kafka-client-metrics.sh` which provides an easier way to
>> work with client metrics config
>> than the general-purpose `kafka-configs.sh`. So, this new RPC will
>> actually be used in the
>> `kafka-client-metrics.sh` tool.
>>
>> Thanks,
>> Andrew
>>
>>> On 16 Nov 2023, at 18:00, Jun Rao  wrote:
>>>
>>> Hi, Andrew,
>>>
>>> Thanks for the KIP. Just one comment.
>>>
>>> Should we extend ConfigCommand or add a new tool to list client metrics?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Nov 7, 2023 at 9:42 AM Andrew Schofield <
>>> andrew_schofield_j...@outlook.com> wrote:
>>>
 Hi,
 I would like to start discussion of a small KIP which fills a gap in the
 administration of client metrics configuration.



>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1000%3A+List+Client+Metrics+Configuration+Resources

 Thanks,
 Andrew
>>
>>