Yeah, regarding ControllerChannelManager, it is one of the synchronous I/O use cases (along with 2 others: KafkaProducer, via Sender; and ReplicaFetcherBlockingSend, via ReplicaFetcherThread) where the assumption is complete ownership of the connection. The PR's approach to dealing with that assumption is to inject the re-authentication requests into the owner's existing flow so that they are simply sent with a callback that NetworkClient executes. The alternative approach, which is what I think you are investigating, is to allow the connection to be temporarily taken offline and having any attempt by ControllerChannelManager (or other synchronous use case owner) to use the connection while it is in this "offline" state result in some kind of pause until the connection comes back online. One issue with this approach might be the length of time that the connection is unavailable; will it be offline for all authentication requests and responses (ApiVersionsRequest/Response, SaslHandshakeRequest/Response, and SaslAuthenticateRequest/Response)? Note the last one could actually be invoked multiple times, so there could be 4 or more round-trips before the authentication "dance" is finished. Will the connection be "offline" the entire time, or will it be placed back "online" in between each request/response pair to allow the owner of the connection to use it -- in which case the authentication process would have to wait to get ownership again? The approach I took interleaves the authentication requests/responses with whatever the owner is doing, so it is conceivable that use of the connection jumps back and forth between the two purposes. Such jumping back and forth minimizes any added latency due to the re-authentication, of course.
Anyway, I'll look forward to finding out what you are able to conclude. Good luck :-) Ron On Thu, Sep 6, 2018 at 5:17 AM Rajini Sivaram <rajinisiva...@gmail.com> wrote: > Hi Ron, > > Disconnections on the broker-side: I think we should do disconnections as a > separate KIP and PR as you originally intended. But that one could be done > separately without requiring KIP-368 as a pre-req. As a simpler > implementation and one that can be used without KIP-368 in some cases, we > could commit that first since this one may take longer. > > I wasn't suggesting that we do move re-authentication to NetworkClient, I > was thinking of the lower layers, handling authentication and > reauthentication at the same layer in a similar way. Let me look into the > code and come up with a more detailed explanation to avoid confusion. > > I am not too concerned about the imports in KafkaChannel. As you said, we > can improve that if we need to. KafkaChannels are aware of > network/authentication states and if that becomes a bit more complex, I > don't think it would matter so much. My concern is about changes like > > https://github.com/apache/kafka/pull/5582/files#diff-987fef43991384a3ebec5fb55e53b577 > in ControllerChannelManager. Those classes shouldn't have deal with SASL or > reauthentication. Anyway, I will get back with more detail on what I had in > mind since that may not be viable at all. > > > > On Thu, Sep 6, 2018 at 1:44 AM, Ron Dagostino <rndg...@gmail.com> wrote: > > > I just thought of another alternative if the imports are the concern. > > KafkaChannel could expose the fact that it can create an additional > > Authenticator instance on the side (what I referred to as > > notYetAuthenticatedAuthenticator in the PR) and it could let > > kafka.server.KafkaApis drive the whole thing -- create the instance on > the > > side, clean it up if it fails, move it into place and close the old one > if > > it succeeds, etc. Then KafkaChannel wouldn't need to import anything new > > -- it just exposes its Authenticator and the ability to perform the swap > > upon success, etc. > > > > Ron > > > > On Wed, Sep 5, 2018 at 5:01 PM Ron Dagostino <rndg...@gmail.com> wrote: > > > > > Hi again, Rajini, I realized a couple of potential concerns with using > > > the TransportLayer directly during re-authentication. First, in the > > > blocking I/O use case, the owner of the NetworkClient instance calls > > > NetworkClientUtils.sendAndReceive() to send requests. This method > > > assumes the caller has exclusive access to the NetworkClient, so it > does > > > not check to see if the node is ready; it just sends the request and > > > repeatedly calls poll() until the response arrives. if we were to take > > > the connection temporarily offline that method currently has no > mechanism > > > for checking for such a state before it sends the request; we could add > > it, > > > but we would have to put in some kind of sleep loop to keep checking > for > > it > > > to come back "online" before it could send. Adding such a sleep loop > > could > > > be done, of course, but it doesn't sound ideal. > > > > > > A similar sleep loop situation exists in the async use case. The owner > > > repeatedly calls poll() in a loop, but if the connection to the node is > > > temporarily offline then the poll() method would have to enter a sleep > > > loop until either the connection comes back online or the timeout > elapses > > > (whichever comes first). > > > > > > I don't know if there is an aversion to adding sleep loops like that, > so > > > maybe it isn't a big issue, but I wanted to raise it as a potential > > concern > > > with this approach. > > > > > > Also, is the import of SASL-specific classes in KafkaChannel a major > > > objection to the current implementation? I could eliminate that by > > > replacing the 2 offending methods in KafkaChannel with this one and > > > having the implementation delegate to the authenticator: > > > > > > /** > > > * Respond to a re-authentication request. This occurs on the > > > * Server side of the re-authentication dance (i.e. on the broker). > > > * > > > * @param requestHeader > > > * the request header > > > * @param request > > > * the request to process > > > * @return the response to return to the client > > > */ > > > public AbstractResponse respondToReauthenticationReque > > st(RequestHeader > > > requestHeader, > > > AbstractRequest request) > > > > > > There is practically no work being done in the KafkaChannel instance > > > anyway -- it does some sanity checking but otherwise delegates to the > > > authenticator. We could just add a method to the Authenticator > interface > > > and delegate the whole thing. > > > > > > Ron > > > > > > On Wed, Sep 5, 2018 at 2:07 PM Ron Dagostino <rndg...@gmail.com> > wrote: > > > > > >> Hi Rajini. I'm now skeptical of my "ConnectionState.REAUTHENTICATING" > > idea. > > >> The concept of a connection being "READY" or not can impact > > >> ConsumerCoordinator (see, for example, > > >> https://github.com/apache/kafka/blob/trunk/clients/src/ > > main/java/org/apache/kafka/clients/consumer/internals/ > > ConsumerCoordinator.java#L352). > > >> The semantics of a connection being "READY" and the > > >> implications/assumptions are not clear, and I suspect there will be > some > > >> unintended consequences of this approach that may not be immediately > > >> apparent. > > >> > > >> I will confess that when I was implementing re-authentication I > thought > > >> it might be worthwhile to unify the authentication-related code bases > -- > > >> except I suspected it would be good to go in the other direction: have > > the > > >> current code that directly uses the TransportLayer instead use > > >> AuthenticationSuccessOrFailureReceiver and > > AuthenticationRequestEnqueuer. > > >> I'm not advocating that we do it -- I decided to not go there when > > creating > > >> the PR, after all -- but I did get a strong feeling that directly > using > > the > > >> TransportLayer as is currently done is really only viable before > anybody > > >> else starts using the connection. If we want to use the > TransportLayer > > again > > >> after that point then it is up to us to somehow take the connection > > >> "temporarily offline" so that we have exclusive rights to it again, > and > > I > > >> wonder if the concept of a connection being "temporarily offline" is > > >> something the existing code is able to handle -- probably not, and I > > >> suspect there are unstated assumptions that would be invalidated. > > >> > > >> Do you think this particular "ConnectionState.REAUTHENTICATING" idea > is > > >> worth pursuing? How about the general idea of trying to use the > > >> TransportLayer directly -- are you still feeling like it is viable? > > >> > > >> Ron > > >> > > >> > > >> > > >> Ron > > >> > > >> On Wed, Sep 5, 2018 at 11:40 AM Ron Dagostino <rndg...@gmail.com> > > wrote: > > >> > > >>> <<<in favor of implementing server-side kill in addition to > > >>> re-authentication, not as a replacement. > > >>> <<<I think Rajini suggested the same thing > > >>> Oh, ok, I misunderstood. Then I think we are on the same page: if we > > >>> are going to do re-authentication, then we should also do > > >>> server-side-kill-upon-expiration as part of the same implementation. > > I'm > > >>> good with that. > > >>> > > >>> I am also looking into Rajini's idea of doing re-authentication at > the > > >>> NetworkClient level and reusing the existing authentication code > > path. I > > >>> was skeptical when she suggested it, but now that I look closer I see > > >>> something that I can try. NetworkClient has logic to recognize the > > >>> state "ConnectionState.CONNECTING" as meaning "you can't do anything > > >>> with this connection at the moment; please wait." I'm going to try > > adding > > >>> a new state "ConnectionState.REAUTHENTICATING" that would be > > recognized > > >>> in a similar way. Then the challenge becomes inserting myself into > any > > >>> existing flow that might be going on. I'll probably add the request > > to set > > >>> the state to "REAUTHENTICATING" to a queue if I can't grab the state > > >>> immediately and have the network client's poll() method check at the > > >>> end to see if any such requests can be granted; there would be a > > callback > > >>> associated with the request, and that way I can be assured I would be > > >>> granted the request in a reasonable amount of time (assuming the > > connection > > >>> doesn't close in the meantime). Then it would be up the callback > > >>> implementation to perform the re-authentication dance and set the > state > > >>> back to "ConnectionState.READY". I don't know if this will work, and > > >>> I'm probably missing some subtleties at the moment, but I'll give it > a > > shot > > >>> and see what happens. > > >>> > > >>> Ron > > >>> > > >>> On Wed, Sep 5, 2018 at 11:23 AM Colin McCabe <cmcc...@apache.org> > > wrote: > > >>> > > >>>> On Wed, Sep 5, 2018, at 07:34, Ron Dagostino wrote: > > >>>> > I added a "How To Support Re-Authentication for Other SASL > > Mechanisms" > > >>>> > section to the KIP as Rajini suggested. I also added a "Rejected > > >>>> > Alternative" for the idea of forcibly closing connections on the > > >>>> client > > >>>> > side upon token refresh or on the server side upon token > expiration. > > >>>> It > > >>>> > may be a bit premature to reject the server-side kill scenario > given > > >>>> that > > >>>> > Colin and Rajini are partial to it, but see below for what I said > > >>>> about it, > > >>>> > and I think it makes sense -- server-side kill without an ability > > for > > >>>> the > > >>>> > client to re-authenticate to avoid it may be useful in certain > > >>>> specific > > >>>> > cases, but as a general feature it doesn't really work. I would > be > > >>>> willing > > >>>> > to add server-side-kill to the scope of this KIP if that is > desired. > > >>>> > > >>>> Hi Ron, > > >>>> > > >>>> To clarify, I am in favor of implementing server-side kill in > addition > > >>>> to re-authentication, not as a replacement. I think Rajini > suggested > > the > > >>>> same thing. > > >>>> > > >>>> It seems clear that server-side kill is needed to provide security. > > >>>> Otherwise a bad client can simply decide not to re-authenticate, and > > >>>> continue using server resources indefinitely. Neither > authentication > > nor > > >>>> re-authentication should be optional, or else the bad guys will > > simply take > > >>>> the option not to authenticate. > > >>>> > > >>>> best, > > >>>> Colin > > >>>> > > >>>> > > >>>> > > > >>>> > A brute-force alternative is to simply kill the connection on the > > >>>> client > > >>>> > > side when the background login thread refreshes the credential. > > The > > >>>> > > advantage is that we don't need a code path for > re-authentication > > – > > >>>> the > > >>>> > > client simply connects again to replace the connection that was > > >>>> killed. > > >>>> > > There are many disadvantages, though. The approach is harsh – > > >>>> having > > >>>> > > connections pulled out from underneath the client will introduce > > >>>> latency > > >>>> > > while the client reconnects; it introduces non-trivial resource > > >>>> utilization > > >>>> > > on both the client and server as TLS is renegotiated; and it > > forces > > >>>> the > > >>>> > > client to periodically "recover" from what essentially looks > like > > a > > >>>> failure > > >>>> > > scenario. While these are significant disadvantages, the most > > >>>> significant > > >>>> > > disadvantage of all is that killing connections on the client > side > > >>>> adds no > > >>>> > > security – trusting the client to kill its connection in a > timely > > >>>> fashion > > >>>> > > is a blind and unjustifiable trust. > > >>>> > > > > >>>> > > > >>>> > > > >>>> > > We could kill the connection from the server side instead, when > > the > > >>>> token > > >>>> > > expires. But in this case, if there is no ability for the > client > > to > > >>>> > > re-authenticate to avoid the killing of the connection in the > > first > > >>>> place, > > >>>> > > then we still have all of the harsh approach disadvantages > > >>>> mentioned above. > > >>>> > > > >>>> > > > >>>> > Ron > > >>>> > > > >>>> > On Wed, Sep 5, 2018 at 10:25 AM Colin McCabe <cmcc...@apache.org> > > >>>> wrote: > > >>>> > > > >>>> > > On Wed, Sep 5, 2018, at 01:41, Rajini Sivaram wrote: > > >>>> > > > *Re-authentication vs disconnection:* > > >>>> > > > In a vast number of secure Kafka deployments, SASL_SSL is the > > >>>> security > > >>>> > > > protocol (this is the recommended config for OAUTHBEARER). If > we > > >>>> require > > >>>> > > > disconnections on token expiry, we would need new connections > to > > >>>> be > > >>>> > > > established with an expensive SSL handshake. This adds load on > > >>>> the broker > > >>>> > > > and will result in a latency spike. For OAUTHBEARER in > > >>>> particular, when > > >>>> > > > tokens are used to make authorisation decisions, we want to > be a > > >>>> able to > > >>>> > > > support short-lived tokens where token lifetime (granting > > >>>> authorisation) > > >>>> > > is > > >>>> > > > small. To make this usable in practice, I believe we need to > > >>>> support > > >>>> > > > re-authentication of existing connections. > > >>>> > > > > >>>> > > Hi Rajini, > > >>>> > > > > >>>> > > Thanks for the explanation. That makes sense. > > >>>> > > > > >>>> > > > > > >>>> > > > *Also explicitly out-of-scope for this proposal is the ability > > for > > >>>> > > brokers > > >>>> > > > to close connections that continue to use expired credentials. > > >>>> This > > >>>> > > > ability is a natural next step, but it will be addressed via a > > >>>> separate > > >>>> > > KIP > > >>>> > > > if/when this one is adopted.* > > >>>> > > > Perhaps we could do this the other way round? I don't think we > > >>>> would ever > > >>>> > > > want to close connections on the client-side to support > expired > > >>>> > > credentials > > >>>> > > > because that doesn't add any security guarantees. But we do > > >>>> require the > > >>>> > > > ability for brokers to close connections in order to enforce > > >>>> credential > > >>>> > > > expiry. Disconnection on the broker-side may be sufficient for > > >>>> some > > >>>> > > > deployments and could be useful on its own. It would also be > the > > >>>> easier > > >>>> > > > implementation. So maybe that could be the first step? > > >>>> > > > > >>>> > > +1 for doing disconnection first. Otherwise, as you noted, > there > > >>>> are no > > >>>> > > security guarantees -- the client can just decide not to > > >>>> re-authenticate > > >>>> > > and keep using the old credentials. You don't even need to > modify > > >>>> the > > >>>> > > source code -- older clients would behave this way. > > >>>> > > > > >>>> > > best, > > >>>> > > Colin > > >>>> > > > > >>>> > > > > > >>>> > > > *The implementation is designed in such a way that it does not > > >>>> preclude > > >>>> > > > adding support for re-authentication of other SASL mechanism > > >>>> (e.g. PLAIN, > > >>>> > > > SCRAM-related, and GSSAPI), but doing so is explicitly > > >>>> out-of-scope for > > >>>> > > > this proposal. * > > >>>> > > > Isn't re-authentication driven by ExpiringCredential? We don't > > >>>> need to > > >>>> > > > support re-authentication by default for the other mechanisms > in > > >>>> this > > >>>> > > KIP, > > >>>> > > > but any mechanism could enable this by adding a custom login > > >>>> callback > > >>>> > > > handler to provide an ExpiringCredential? For disconnection as > > >>>> well as > > >>>> > > > re-authentication, it will be good if we can specify exactly > how > > >>>> it can > > >>>> > > be > > >>>> > > > implemented for each of the SASL mechanisms, even if we > actually > > >>>> > > implement > > >>>> > > > it only for OAUTHBEARER. > > >>>> > > > > > >>>> > > > > > >>>> > > > On Wed, Sep 5, 2018 at 2:43 AM, Colin McCabe < > > cmcc...@apache.org> > > >>>> wrote: > > >>>> > > > > > >>>> > > > > On Tue, Sep 4, 2018, at 17:43, Ron Dagostino wrote: > > >>>> > > > > > Hi Colin. Different organizations will rely on different > > >>>> token > > >>>> > > > > lifetimes, > > >>>> > > > > > but anything shorter than an hour feels like it would be > > >>>> pretty > > >>>> > > > > > aggressive. An hour or more will probably be most common. > > >>>> > > > > > > >>>> > > > > Thanks. That's helpful to give me a sense of what the > > >>>> performance > > >>>> > > impact > > >>>> > > > > might be. > > >>>> > > > > > > >>>> > > > > > > > >>>> > > > > > <<<alternate solution of terminating connections when the > > >>>> bearer > > >>>> > > token > > >>>> > > > > > changed > > >>>> > > > > > I may be mistaken, but I think you are suggesting here > that > > we > > >>>> > > forcibly > > >>>> > > > > > kill connections from the client side whenever the > > background > > >>>> Login > > >>>> > > > > refresh > > >>>> > > > > > thread refreshes the token (which it currently does so > that > > >>>> the > > >>>> > > client > > >>>> > > > > can > > >>>> > > > > > continue to make new connections). Am I correct? > > >>>> > > > > > > >>>> > > > > Yes, this is what I'm thinking about. We could also > terminate > > >>>> the > > >>>> > > > > connection on the server, if that is more convenient. > > >>>> > > > > > > >>>> > > > > > If that is what you are > > >>>> > > > > > referring to, my sense is that it would be a very crude > way > > of > > >>>> > > dealing > > >>>> > > > > with > > >>>> > > > > > the issue that would probably lead to dissatisfaction in > > some > > >>>> sense > > >>>> > > > > (though > > >>>> > > > > > I can't be sure). > > >>>> > > > > > > >>>> > > > > What information should we gather so that we can be sure? > > >>>> > > > > > > >>>> > > > > > I do know that when I implemented SASL/OAUTHBEARER it > > >>>> > > > > > was communicated that leaving existing connections intact > -- > > >>>> as is > > >>>> > > done > > >>>> > > > > for > > >>>> > > > > > GSSAPI -- was the appropriate path forward. > > >>>> > > > > > > >>>> > > > > Thanks, that's good background information. Can someone > chime > > >>>> in with > > >>>> > > the > > >>>> > > > > reasoning behind this? > > >>>> > > > > > > >>>> > > > > My best guess is that terminating connections might cause a > > >>>> temporary > > >>>> > > > > increase in latency as they are re-established. > > >>>> > > > > > > >>>> > > > > In any case, we should figure out what the reasoning is so > > that > > >>>> we can > > >>>> > > > > make a decision. It seems worthwhile including this as a > > >>>> "rejected > > >>>> > > > > alternative," at least. > > >>>> > > > > > > >>>> > > > > thanks, > > >>>> > > > > Colin > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > > >>>> > > > > > Ron > > >>>> > > > > > > > >>>> > > > > > On Tue, Sep 4, 2018 at 8:31 PM Colin McCabe < > > >>>> cmcc...@apache.org> > > >>>> > > wrote: > > >>>> > > > > > > > >>>> > > > > > > Hi Ron, > > >>>> > > > > > > > > >>>> > > > > > > Thanks for the KIP. > > >>>> > > > > > > > > >>>> > > > > > > What is the frequency at which you envision bearer > tokens > > >>>> changing > > >>>> > > at? > > >>>> > > > > > > > > >>>> > > > > > > Did you consider the alternate solution of terminating > > >>>> connections > > >>>> > > when > > >>>> > > > > > > the bearer token changed? > > >>>> > > > > > > > > >>>> > > > > > > best, > > >>>> > > > > > > Colin > > >>>> > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > On Tue, Aug 28, 2018, at 07:28, Ron Dagostino wrote: > > >>>> > > > > > > > Hi everyone. I created KIP 368: Allow SASL Connections > > to > > >>>> > > > > Periodically > > >>>> > > > > > > > Re-Authenticate > > >>>> > > > > > > > < > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>> > > > > > 368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate > > >>>> > > > > > > > > > >>>> > > > > > > > ( > > >>>> > > > > > > > > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>> > > > > > 368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate > > >>>> > > > > > > ). > > >>>> > > > > > > > The motivation for this KIP is as follows: > > >>>> > > > > > > > > > >>>> > > > > > > > The adoption of KIP-255: OAuth Authentication via > > >>>> > > SASL/OAUTHBEARER > > >>>> > > > > > > > < > > >>>> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > >>>> > > > > action?pageId=75968876> > > >>>> > > > > > > > > >>>> > > > > > > > in > > >>>> > > > > > > > release 2.0.0 creates the possibility of using > > >>>> information in the > > >>>> > > > > bearer > > >>>> > > > > > > > token to make authorization decisions. Unfortunately, > > >>>> however, > > >>>> > > Kafka > > >>>> > > > > > > > connections are long-lived, so there is no ability to > > >>>> change the > > >>>> > > > > bearer > > >>>> > > > > > > > token associated with a particular connection. > Allowing > > >>>> SASL > > >>>> > > > > > > > connections > > >>>> > > > > > > > to periodically re-authenticate would resolve this. > In > > >>>> addition > > >>>> > > to > > >>>> > > > > this > > >>>> > > > > > > > motivation there are two others that are > > security-related. > > >>>> > > First, to > > >>>> > > > > > > > eliminate access to Kafka for connected clients, the > > >>>> current > > >>>> > > > > requirement > > >>>> > > > > > > > is > > >>>> > > > > > > > to remove all authorizations (i.e. remove all ACLs). > > >>>> This is > > >>>> > > > > necessary > > >>>> > > > > > > > because of the long-lived nature of the connections. > It > > >>>> is > > >>>> > > > > > > > operationally > > >>>> > > > > > > > simpler to shut off access at the point of > > >>>> authentication, and > > >>>> > > with > > >>>> > > > > the > > >>>> > > > > > > > release of KIP-86: Configurable SASL Callback Handlers > > >>>> > > > > > > > < > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>> > > > > 86%3A+Configurable+SASL+callback+handlers > > >>>> > > > > > > > > > >>>> > > > > > > > it > > >>>> > > > > > > > is going to become more and more likely that > > >>>> installations will > > >>>> > > > > > > > authenticate users against external directories (e.g. > > via > > >>>> > > LDAP). The > > >>>> > > > > > > > ability to stop Kafka access by simply disabling an > > >>>> account in an > > >>>> > > > > LDAP > > >>>> > > > > > > > directory (for example) is desirable. The second > > >>>> motivating > > >>>> > > factor > > >>>> > > > > for > > >>>> > > > > > > > re-authentication related to security is that the use > of > > >>>> > > short-lived > > >>>> > > > > > > > tokens > > >>>> > > > > > > > is a common OAuth security recommendation, but > issuing a > > >>>> > > short-lived > > >>>> > > > > > > > token > > >>>> > > > > > > > to a Kafka client (or a broker when OAUTHBEARER is the > > >>>> > > inter-broker > > >>>> > > > > > > > protocol) currently has no benefit because once a > client > > >>>> is > > >>>> > > > > connected to > > >>>> > > > > > > > a > > >>>> > > > > > > > broker the client is never challenged again and the > > >>>> connection > > >>>> > > may > > >>>> > > > > > > > remain > > >>>> > > > > > > > intact beyond the token expiration time (and may > remain > > >>>> intact > > >>>> > > > > > > > indefinitely > > >>>> > > > > > > > under perfect circumstances). This KIP proposes > adding > > >>>> the > > >>>> > > ability > > >>>> > > > > for > > >>>> > > > > > > > clients (and brokers when OAUTHBEARER is the > > inter-broker > > >>>> > > protocol) > > >>>> > > > > to > > >>>> > > > > > > > re-authenticate their connections to brokers and have > > the > > >>>> new > > >>>> > > bearer > > >>>> > > > > > > > token > > >>>> > > > > > > > appear on their session rather than the old one. > > >>>> > > > > > > > > > >>>> > > > > > > > The description of this KIP is actually quite > > >>>> straightforward > > >>>> > > from a > > >>>> > > > > > > > functionality perspective; from an implementation > > >>>> perspective, > > >>>> > > > > though, > > >>>> > > > > > > the > > >>>> > > > > > > > KIP is not so straightforward, so it includes a pull > > >>>> request > > >>>> > > with a > > >>>> > > > > > > > proposed implementation. See > > https://github.com/apache/ > > >>>> > > > > kafka/pull/5582. > > >>>> > > > > > > > > > >>>> > > > > > > > Ron > > >>>> > > > > > > > > >>>> > > > > > > >>>> > > > > >>>> > > >>> > > >