Hi everyone, after fixing an issue with a regular expression in Connect's
class loading isolation of the new component type ConfigProvider here:

https://github.com/apache/kafka/pull/5177

I noticed that the new interface ConfigProvider, along with its first
implementation FileConfigProvider, have been placed in the package:

org.apache.kafka.common.config

This specific package is mentioned in KIP-297 is a few places, but not in
any code snippets. I'd like to suggest moving the interface and any current
of future implementation classes in a new package named:

org.apache.kafka.common.config.provider

and update the KIP document accordingly.

This seems to make sense in general. But, specifically, in Connect it is
desired since we treat ConfigProvider implementations as Connect components
that are loaded in isolation. Having a package for config providers will
allow us to avoid making any assumptions with respect to the name of a
class that implements `ConfigProvider` and is included in Apache Kafka. It
will suffice for this class to reside in the package
org.apache.kafka.common.config.provider.

Let me know if this is a reasonable request and if you agree on amending
the KIP description.

- Konstantine



On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram <rajinisiva...@gmail.com>
wrote:

> Thanks for the update, Robert. Looks good to me.
>
> Regards,
>
> Rajini
>
> On Wed, May 16, 2018 at 4:43 PM, Robert Yokota <rayok...@gmail.com> wrote:
>
> > Hi Rajini,
> >
> > Thanks for the excellent feedback!
> >
> > I've made the API changes that you've requested in the KIP.
> >
> >
> > > 1. Are we expecting one provider instance with different contexts
> > > provided to `ConfigProvider.get()`? If we created a different provider
> > > instance for each context, we could deal with scheduling reloads in the
> > > provider implementation?
> >
> > Yes, there would be one provider instance.  I've collapsed the
> > ConfigContext and the ConfigChangeCallback by adding a parameter delayMs
> to
> > indicate when the change will happen.  When a particular ConfigProvider
> > retrieves a lease duration along with a key, it can either 1)  schedule a
> > background thread to push out the change when it happens (at which time
> the
> > delayMs will be 0), or invoke the callback immediately with the lease
> > duration set as delayMs (of course, in this case the values for the keys
> > will be the old values).  A ConfProvider could be parameterized to do one
> > or the other.
> >
> >
> > > 2. Couldn't ConfigData  be an interface that just returns a map of
> > > key-value pairs. Providers that return metadata could extend it to
> > provide
> > > metadata in a meaningful format instead of Map<String, String>.
> >
> > I've replaced ConfigData with Map<String, String> as you suggested.
> >
> >
> > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get all
> > > keys in the path. Do we have two get() methods since some providers
> need
> > > keys to be specified and some don't? How do we decide which one to use?
> >
> > The ConfigProvider should be thought of like a Map interface and does not
> > require that one signature of get() be preferred over the other.  KIP-226
> > can use get(String path) while Connect will use get(String path,
> > Set<String>) since it knows which keys it is interested in.
> >
> >
> > A few more updates to the KIP:
> >
> > - I've elided the ConfigTransformer implementation as Colin suggested.
> > - The variable reference now looks like ${provider:[path:]key} where the
> > path is optional.
> >
> >
> > Thanks!
> > Robert
> >
> >
> >
> >
> > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram <rajinisiva...@gmail.com
> >
> > wrote:
> >
> > > Hi Robert,
> > >
> > > Thanks for the KIP updates.
> > >
> > > The interfaces look suitable for brokers, with some small changes. If
> we
> > > can adapt the interface to implement the existing DynamicBrokerConfig,
> > then
> > > we are good.
> > >
> > > With broker configs:
> > >
> > >    1. We don't know what configs are in ZK since we allow custom
> configs.
> > >    So we would use `ConfigProvider.get()` without specifying keys.
> > >    2. We want to see all changes (i.e. changes under a path). We can
> deal
> > >    with this internally by ignoring `keys` and subscribing to
> everything
> > >    3. We have two paths (one for per-broker config and another for
> > default
> > >    config shared by all brokers). All methods should ideally provide
> > path -
> > >    see changes suggested below.
> > >    4. Keys are not independent. We update in batches (e.g keystore +
> > >    password). We want to see batches of changes, not individual
> changes.
> > We
> > >    retrieve all values from a path when a change is detected. We can do
> > > this
> > >    by ignoring values from the callback, but it would be better if the
> > >    callback interface could be changed - see below.
> > >
> > >
> > > public interface ConfigProvider extends Configurable, Closeable {
> > >
> > >     *//** KIP-226 will use this*
> > >     ConfigData get(ConfigContext ctx, String path);
> > >
> > >     *// **KIP-226 will never use this, we don't know what keys are in
> ZK
> > > since we allow custom configs*
> > >     ConfigData get(ConfigContext ctx, String path, Set<String> keys);
> > >
> > > *    // KIP-226 will ignore `key` and subscribe to all changes.*
> > > *    // But based on the above method, this should perhaps be:*
> > > *    //  subscribe(String path, Set<String> keys,
> > > ConfigurationChangeCallback callback)?*
> > >     void subscribe(String key, ConfigurationChangeCallback callback);
> > >
> > >      *<== As above, un**subscribe(String path, Set<String> keys)**?*
> > >     void unsubscribe(String key);
> > > }
> > >
> > > public interface ConfigurationChangeCallback {
> > >     *// **For brokers, we want to process all updated keys in a single
> > > callback. P**erhaps this could be: *
> > >
> > > *    //   onChange(String path, Map<String, String> values)?*
> > >
> > >     void onChange(String key, String value);
> > > }
> > >
> > > A few other questions (I read your response to Colin, but still didn't
> > get
> > > it. Could be because I am not familiar with the interfaces required for
> > > vaults, sorry):
> > >
> > >    1. Are we expecting one provider instance with different contexts
> > >    provided to `ConfigProvider.get()`? If we created a different
> provider
> > >    instance for each context, we could deal with scheduling reloads in
> > the
> > >    provider implementation?
> > >    2. Couldn't ConfigData  be an interface that just returns a map of
> > >    key-value pairs. Providers that return metadata could extend it to
> > > provide
> > >    metadata in a meaningful format instead of Map<String, String>.
> > >    3. For ZK, we would use ConfigProvider.get() without `keys` to get
> all
> > >    keys in the path. Do we have two get() methods since some providers
> > need
> > >    keys to be specified and some don't? How do we decide which one to
> > use?
> > >
> > > Thanks,
> > >
> > > Rajini
> > >
> > >
> > > On Wed, May 16, 2018 at 2:40 AM, Robert Yokota <rayok...@gmail.com>
> > wrote:
> > >
> > > > Thanks, Ron!  I will take a look.
> > > >
> > > > Regards,
> > > > Robert
> > > >
> > > > On Tue, May 15, 2018 at 5:59 PM, Ron Dagostino <rndg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Robert.  Regarding your comment "use the lease duration to
> > schedule
> > > a
> > > > > configuration reload in the future", you might be interested in the
> > > code
> > > > > that does refresh for OAuth Bearer Tokens in KIP-255; specifically,
> > the
> > > > > class
> > > > > org.apache.kafka.common.security.oauthbearer.internal.expiring.
> > > > > ExpiringCredentialRefreshingLogin.
> > > > > The class performs JAAS logins/relogins based on the expiration
> time
> > > of a
> > > > > retrieved expiring credential.  The implementation of that class is
> > > > > inspired by the code that currently does refresh for Kerberos
> tickets
> > > but
> > > > > is more reusable.  I don't know if you will leverage JAAS for
> > defining
> > > > how
> > > > > to go get a credential (you could since you have to provide
> > credentials
> > > > to
> > > > > authenticate to the remote systems anyway), but regardless, that
> > class
> > > > > should be useful at least in some minimal sense if not more than
> > that.
> > > > See
> > > > > https://github.com/apache/kafka/pull/4994.
> > > > >
> > > > > Ron
> > > > >
> > > > > Ron
> > > > >
> > > > > On Tue, May 15, 2018 at 8:01 PM, Robert Yokota <rayok...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Colin,
> > > > > >
> > > > > > Thanks for the feedback!
> > > > > >
> > > > > >
> > > > > > > The KIP says that "Vault is very popular and has been described
> > as
> > > > 'the
> > > > > > current gold standard in secret management and provisioning'."  I
> > > think
> > > > > > this might be a bit too much detail -- we don't really need to
> > > > > > > favorites, right? :)
> > > > > >
> > > > > > I've removed this line :)
> > > > > >
> > > > > >
> > > > > > > I think we should make the substitution part of the generic
> > > > > configuration
> > > > > > code, rather than specific to individual ConfigProviders.  We
> don't
> > > > > really
> > > > > > want it to work differently for Vault vs. KeyWhiz vs.
> > > > > > > AWS secrets, etc. etc.
> > > > > >
> > > > > > Yes, the ConfigProviders merely serve up key-value pairs.  A
> helper
> > > > class
> > > > > > like ConfigTransformer would perform the variable substitutions
> if
> > > > > desired.
> > > > > >
> > > > > >
> > > > > > > We should also spell out exactly how substitution works.
> > > > > >
> > > > > > By one-level of indirection I just meant a simple replacement of
> > > > > variables
> > > > > > (which are the indirect references).  So if you have foo=${bar}
> and
> > > > > > bar=${baz} and your file contains bar=hello, baz=world, then the
> > > final
> > > > > > result would be foo=hello and bar=world.  I've added this example
> > to
> > > > the
> > > > > > KIP.
> > > > > >
> > > > > > You can see this as the DEFAULT_PATTERN in the ConfigTransformer.
> > > The
> > > > > > ConfigTransformer only provides one level of indirection.
> > > > > >
> > > > > >
> > > > > > > We should also spell out how this interacts with KIP-226
> > > > > configurations.
> > > > > >
> > > > > > Yes, I mention at the beginning that KIP-226 could use the
> > > > ConfigProvider
> > > > > > but not the ConfigTransformer.
> > > > > >
> > > > > >
> > > > > > > Maybe a good generic interface would be like this:
> > > > > >
> > > > > > I've added the subscription APIs but would like to keep the other
> > > APIs
> > > > > as I
> > > > > > will need them for integration with Vault.  With Vault I obtain
> the
> > > > lease
> > > > > > duration at the time the key is obtained, so at that time I would
> > > want
> > > > to
> > > > > > use the lease duration to schedule a configuration reload in the
> > > > future.
> > > > > > This is similar to how the integration between Vault and the
> Spring
> > > > > > Framework works.   Also, the lease duration would be included in
> > the
> > > > > > metadata map vs. the data map.  Finally, I need an additional
> > "path"
> > > or
> > > > > > "bucket" parameter, which is used by Vault to indicate which set
> of
> > > > > > key-values are to be retrieved.
> > > > > >
> > > > > >
> > > > > > > With regard to ConfigTransformer: do we need to include all
> this
> > > code
> > > > > in
> > > > > > the KIP?  Seems like an implementation detail.
> > > > > >
> > > > > > I use the ConfigTransformer to show how the pattern
> ${provider:key}
> > > is
> > > > > > defined and how the substitution only involves one level of
> > > > indirection.
> > > > > > If you feel it does not add anything to the text, I can remove
> it.
> > > > > >
> > > > > >
> > > > > > > Is there a way to avoid this couping?
> > > > > >
> > > > > > I'd have to look into it and get back to you.  However, I assume
> > that
> > > > the
> > > > > > answer is not relevant for this KIP :)
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Robert
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 15, 2018 at 4:04 PM, Colin McCabe <
> cmcc...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Robert,
> > > > > > >
> > > > > > > Thanks for posting this.  In the past we've been kind of
> > reluctant
> > > to
> > > > > add
> > > > > > > more complexity to configuration.  I think Connect does have a
> > > clear
> > > > > need
> > > > > > > for this kind of functionality, though.  As you mention,
> Connect
> > > > > > integrates
> > > > > > > with external systems, which are very likely to have passwords
> > > stored
> > > > > in
> > > > > > > Vault, KeyWhiz or some other external system.
> > > > > > >
> > > > > > > The KIP says that "Vault is very popular and has been described
> > as
> > > > 'the
> > > > > > > current gold standard in secret management and
> provisioning'."  I
> > > > think
> > > > > > > this might be a bit too much detail -- we don't really need to
> > pick
> > > > > > > favorites, right? :)
> > > > > > >
> > > > > > > I think we should make configuration consistent between the
> > broker
> > > > and
> > > > > > > Connect.  If people can use constructs like
> > > > > > jdbc.config.key="${vault:jdbc.user}${vault:jdbc.password}"
> > > > > > > in Connect, they'll want to do it on the broker too, in a
> > > consistent
> > > > > way.
> > > > > > >
> > > > > > > If I understand correctly, ConfigProvider represents an
> external
> > > > > > > configuration source, such as VaultConfigProvider,
> > > > > KeyWhizConfigProvider,
> > > > > > > etc.
> > > > > > >
> > > > > > > I think we should make the substitution part of the generic
> > > > > configuration
> > > > > > > code, rather than specific to individual ConfigProviders.  We
> > don't
> > > > > > really
> > > > > > > want it to work differently for Vault vs. KeyWhiz vs. AWS
> > secrets,
> > > > etc.
> > > > > > etc.
> > > > > > >
> > > > > > > We should also spell out exactly how substitution works.  For
> > > > example,
> > > > > is
> > > > > > > substitution limited to 1 level deep?  In other words, If I
> have
> > > > > > > foo="${bar}" and bar=${baz}, probably foo should just be set
> > equal
> > > to
> > > > > > > "${baz}" rather than chasing more than one level of
> indirection.
> > > > > > >
> > > > > > > We should also spell out how this interacts with KIP-226
> > > > > configurations.
> > > > > > > I would suggest that KIP-226 variables not be subjected to
> > > > > substitution.
> > > > > > > The reason is because in theory substitution could lead to
> > > different
> > > > > > > results on different brokers, since the different brokers may
> not
> > > > have
> > > > > > the
> > > > > > > same ConfigProviders configured.  Also, having substitutions in
> > the
> > > > > > KIP-226
> > > > > > > configuration makes it more difficult for the admin to
> understand
> > > > what
> > > > > > the
> > > > > > > centrally managed configuration is.
> > > > > > >
> > > > > > > It seems the main goal is the ability to load a batch of
> > key/value
> > > > > pairs
> > > > > > > from the ConfigProvider, and the ability to subscribe to
> > > > notifications
> > > > > > > about changes to certain parameters.  Maybe a good generic
> > > interface
> > > > > > would
> > > > > > > be like this:
> > > > > > >
> > > > > > >  > public interface ConfigProvider extends Closeable {
> > > > > > > >      // batched get is potentially more efficient
> > > > > > >  >     Map<String, String> get(Collection<String> keys);
> > > > > > > >
> > > > > > > >    // The ConfigProvider is responsible for making this
> > callback
> > > > > > > whenever the key changes.
> > > > > > > >    // Some ConfigProviders may want to have a background
> thread
> > > > with
> > > > > a
> > > > > > > configurable update interval.
> > > > > > >  >     void subscribe(String key, ConfigurationChangeCallback
> > > > > callback);
> > > > > > > >
> > > > > > > >        // Inverse of subscribe
> > > > > > >  >     void unsubscribe(String key);
> > > > > > > >
> > > > > > > >    // Close all subscriptions and clean up all resources
> > > > > > >  >     void close();
> > > > > > >  > }
> > > > > > >  >
> > > > > > >  > interface ConfigurationChangeCallback {
> > > > > > >  >     void onChange(String key, String value);
> > > > > > >  > }
> > > > > > >
> > > > > > > With regard to ConfigTransformer: do we need to include all
> this
> > > code
> > > > > in
> > > > > > > the KIP?  Seems like an implementation detail.
> > > > > > >
> > > > > > > > Other connectors such as the S3 connector are tightly coupled
> > > with
> > > > a
> > > > > > > particular secret manager, and may
> > > > > > > > wish to handle rotation on their own.
> > > > > > >
> > > > > > > Is there a way to avoid this couping?  Seems like some users
> > might
> > > > want
> > > > > > to
> > > > > > > use their own secret manager here.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 9, 2018, at 16:32, Robert Yokota wrote:
> > > > > > > > Hi Magesh,
> > > > > > > >
> > > > > > > > I updated the KIP with a link to a PR for a working
> prototype.
> > > The
> > > > > > > > prototype does not yet use the Connect plugin machinery for
> > class
> > > > > > loader
> > > > > > > > isolation, but should give you an idea of what the final
> > > > > implementation
> > > > > > > > will look like.  Here is the link:
> > > > > > > > https://github.com/apache/kafka/pull/4990/files.
> > > > > > > >
> > > > > > > > I also added an example of a FileConfigProvider to the KIP.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Robert
> > > > > > > >
> > > > > > > > On Wed, May 9, 2018 at 10:04 AM, Robert Yokota <
> > > rayok...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Magesh,
> > > > > > > > >
> > > > > > > > > Thanks for the feedback!
> > > > > > > > >
> > > > > > > > > I will put together a PR to demonstrate what the
> > implementation
> > > > > might
> > > > > > > look
> > > > > > > > > like, as well as a reference FileConfigProvider.
> > > > > > > > >
> > > > > > > > > 1.  The delayMs for a (potentially) scheduled reload is
> > > > determined
> > > > > by
> > > > > > > the
> > > > > > > > > ConfigProvider.  For example, a (hypothetical)
> > > > VaultConfigProvider,
> > > > > > > upon
> > > > > > > > > contacting Vault for a particular secret, might also
> obtain a
> > > > lease
> > > > > > > > > duration indicating that the secret expires in 1 hour. The
> > > > > > > > > VaultConfigProvider could then call scheduleConfigReload
> with
> > > > > delayMs
> > > > > > > set
> > > > > > > > > to 3600000ms (1 hour).  This would cause the Connector to
> > > restart
> > > > > in
> > > > > > an
> > > > > > > > > hour, forcing it to reload the configs and re-resolve all
> > > > indirect
> > > > > > > > > references.
> > > > > > > > >
> > > > > > > > > 2. Yes, the start() methods in SourceTask and SinkTask
> would
> > > get
> > > > > the
> > > > > > > > > configs with all the indirect references resolved.   Those
> > > > config()
> > > > > > > methods
> > > > > > > > > are for Connectors that want to get the latest configs
> (with
> > > all
> > > > > > > indirect
> > > > > > > > > references re-resolved) at some time after start().  For
> > > example,
> > > > > if
> > > > > > a
> > > > > > > Task
> > > > > > > > > encountered some security exception because a secret
> expired,
> > > it
> > > > > > could
> > > > > > > call
> > > > > > > > > config() to get the config with the latest values.  This is
> > > > > assuming
> > > > > > > that
> > > > > > > > > the Task can gracefully recover from the security
> exception.
> > > > > > > > >
> > > > > > > > > 3. Yes, that is up to the ConfigProvider implementation and
> > is
> > > > out
> > > > > of
> > > > > > > > > scope.  If the ConfigProvider also needs some kind of
> secrets
> > > or
> > > > > > other
> > > > > > > > > data, it could possibly be passed in through the param
> > > properties
> > > > > > > > > ("config.providers.vault.param.auth=/run/myauth").  For
> > > example
> > > > > > Docker
> > > > > > > > > might generate the auth info for Vault in an in-memory
> tmpfs
> > > file
> > > > > > that
> > > > > > > > > could then be passed as a param.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Robert
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, May 8, 2018 at 10:10 PM, Magesh Nandakumar <
> > > > > > > mage...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Robert,
> > > > > > > > >>
> > > > > > > > >> Thanks for the KIP. I think, this will be a great addition
> > to
> > > > the
> > > > > > > > >> framework. I think, will be great if the KIP can
> elaborate a
> > > > > little
> > > > > > > bit
> > > > > > > > >> more on how implementations would look like with an
> example.
> > > > > > > > >> Also, would be good to provide a reference implementation
> as
> > > > well.
> > > > > > > > >>
> > > > > > > > >> The other questions I had were
> > > > > > > > >>
> > > > > > > > >> 1.  How would the framework get the delayMs for void
> > > > > > > scheduleConfigReload(
> > > > > > > > >> long delayMs);
> > > > > > > > >> 2. Would the start methods in SourceTask and SinkTask get
> > the
> > > > > > configs
> > > > > > > with
> > > > > > > > >> all the indirect references resolved. If so, trying to
> > > > understand
> > > > > > > > >> the intent of the config() in SourceTaskContext and the
> > > > > > > SinkTaskContext
> > > > > > > > >> 3. What if the provider itself needs some kind of secrets
> to
> > > be
> > > > > > > configured
> > > > > > > > >> to connect to it? I assume that's out of scope for this
> > > proposal
> > > > > but
> > > > > > > > >> wanted
> > > > > > > > >> to clarify it.
> > > > > > > > >>
> > > > > > > > >> Thanks
> > > > > > > > >> Magesh
> > > > > > > > >>
> > > > > > > > >> On Tue, May 8, 2018 at 1:52 PM, Robert Yokota <
> > > > rayok...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi,
> > > > > > > > >> >
> > > > > > > > >> > I would like to start a discussion for KIP-297 to
> > > externalize
> > > > > > > secrets
> > > > > > > > >> from
> > > > > > > > >> > Kafka Connect configurations.  Any feedback is
> > appreciated.
> > > > > > > > >> > <
> > > > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > >> > 297%3A+Externalizing+Secrets+for+Connect+Configurations
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> > JIRA: <https://issues.apache.org/jira/browse/KAFKA-6886
> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks in advance,
> > > > > > > > >> > Robert
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to