Thanks for your feedback Alieh and Sophie.

For everybody: we did sync about open questions in person, and I hope to reply w/o forgetting to add enough context about what we did discuss in person. If anything is missing/unclear, please let us know.


Based on the in-person discussion, we agree to add a new interface

    interface StreamsConsumer extends Consumer { }

and a new class

    public class KafkaStreamsConsumer
        extends KafkaConsumer implements StreamsConsumer { }

And let the new `KafkaClientInterceptor` use these new interface/class instead of `Consumer/KafkaConsumer` for the main consumer.

Note that we don't add any new methods to this interface at this point. The reason is, that the design for KIP-1071 is still to early to know what we will exactly need, and we just want to setup the code early such that we can add new methods easily in the future.

The alternative to adding `StreamsConsumer` would be to add this new interface later if we need it, however, we opted to introduce it right away to avoid future "deprecation noise".

We also decided to add a config parameter into the callbacks, allowing interceptor code to inspect the used configuration for a client. This will be read-only as we did already create the client before calling the interceptor for wrapping. If there is demand in the future, we can always extend the interceptor interface to also allow changing configs.


I also have a rough POC PR for reference. It does not yet reflect the latest points as mentioned in this email, but I hope it will help to clarify a things: https://github.com/apache/kafka/pull/17333


In addition, I update the KIP with a proposal to add predefined "interceptor classes" ([Admin|StreamsConsumer|Consumer|Producer]Interceptor) to improve backward compatibility. Details are on the KIP.


I also add a new config for the interceptor class, similar to the existing config for the current client-supplier one.




Below more detailed replies for Alieh/Sophie.


What would happen if the implemented class creates a
new instance

If KIP-1071 is not used, nothing. It would just work. However, if KIP-1071 is used, it would most likely break KS. -- However, given that we now introduce the new `XxxInterceptor` classes, it would be a little bit harder to return new instances (even if still not impossible...)


in other words, how do we prevent it?

As I tried to point out on the KIP, I believe we cannot completely prevent it; we can only make it an API contract and if users violate the API contract, it's their own fault if KS breaks. That is why we need to move off `KafkaClientSupplier` so we can change the API contract w/o breaking anybody. Using the `XxxInterceptor` classes also helps


About the MockConsumer: As said, we cannot prevent it and for our own tests we might still need to create a new instance (we would need to let `MockConsumer extend ConsumerInterceptor` but that's easy). For most (all?) tests when we mock a client, we don't really connect to a broker cluster, but it's some unit test. For unit tests, nothing should break. And for actual integration test, I don't think what we would mock clients...



Whatever
we end up doing, if there's a way for someone to implement it incorrectly
then it would be good to figure out how we can detect this and warn the
user. I'm not quite sure how much of a safety net we can construct here but
we could try to cover some of the obvious misssteps, for example by
asserting that the returned Consumer is not an instance of KafkaConsumer
unless it's the same exact KafkaConsumer object passed into the
#getMainConsumer call. Not sure if there are other checks we could do?

I don't think that is possible by definition. We do pass in `KafkaConsumer` but do expect a `ConsumerInterceptor` object back. The fundamental problem is, that we cannot know if the object we get is an actually wrapper/intercpetor for the passed-in client, or if the object is not wrapper at all, but a `new MyKafkaConsumer extends ConsumerInterceptor {...}` object. -- But I think it's ok if we cannot guard against it fully -- we can also not guard against users messing with KS using reflections...



The real issue with constructing and
returning a different KafkaConsumer instead of wrapping the one passed in,
is that Streams will be invoking some methods on the user's Consumer while
invoking other methods on the original KafkaConsumer.

No. All methods of the user's Consumer would be invoked. KS only _constructs_ the clients (to hook into some internals, and "connect" KS runtime code to consumer internals for KIP-1071), but otherwise KS would only used the interceptor for all calls on the client. And with the new `StreamsConsumer` interface, we can add new method in the future.



If every consumer method
call is being made directly on the Consumer returned from #getMainConsumer,
it doesn't really matter if they constructed a different one.

If would matter, as KS code would now be "decoupled" from this consumer instance. KS runtime can only tap into the client it does create itself, by using non-public constructor to pass-in non-public "callbacks" which we need to integrate with KIP-1071. (At least for now...)

As discussed in person: we might want to make part of these things public, but we don't know yet, which (if any) and how the concrete API should be designed. Thus, we want to keep everything non-public for now, and when KIP-1071 evolves, we will know better what we need.



If this KIP is
going to immediately deprecate the old KafkaClientSupplier, then we need to
support implementing an interceptor while configured to use the current
rebalancing protocol.

I don't think we need to do this, but we could make it mutually exclusive. However, I don't anticipate any issue to actually support the interceptor interface w/ the old protocol. -- And the KIP explicitly disallows to use both supplier and interceptor, and also disallows using KIP-1071 plus supplier.



That is, can we
assume that the KafkaConsumer passed into #getMainConsumer will only be
constructed via the internal constructor/put into "Streams group mode" if
the app is configured to use the new protocol, and otherwise it will just
pass in a regular KafkaConsumer?

Yes. That's why I don't see any issue as stated above.




-Matthias




On 9/9/24 10:26 PM, Sophie Blee-Goldman wrote:
I agree with Alieh, there is definitely some inconsistency in what we do or
don't want to support/allow. However imo we should lean into the more
flexible approach of allowing an alternative consumer to be constructed and
returned, rather than attempting to enforce the interceptor to strictly
wrap the provided consumer. Mainly because the ability to bypass the "real"
KafkaConsumer and mock/stub methods is pretty essential to the test/mock
consumer use cases. Personally I feel this is a very advanced feature and
we should let people who know what they're doing implement any kind of
consumer for this, and if they misuse the API and break things then it's on
them. I feel we can document things clearly enough so that it will be
difficult to shoot yourself in the foot by accident (plus if they do end up
breaking things eg by constructing a new Consumer and just returning that
instead, it will at least break quickly and obviously. Figuring out that
the problem was in your interceptor might be challenging though. Whatever
we end up doing, if there's a way for someone to implement it incorrectly
then it would be good to figure out how we can detect this and warn the
user. I'm not quite sure how much of a safety net we can construct here but
we could try to cover some of the obvious misssteps, for example by
asserting that the returned Consumer is not an instance of KafkaConsumer
unless it's the same exact KafkaConsumer object passed into the
#getMainConsumer call. Not sure if there are other checks we could do?

I do suspect this problem just goes away if we pivot to the StreamsConsumer
approach we've discussed a bit before. The real issue with constructing and
returning a different KafkaConsumer instead of wrapping the one passed in,
is that Streams will be invoking some methods on the user's Consumer while
invoking other methods on the original KafkaConsumer. Another reason that
splitting up the consumer into a public interface and a hidden internal one
seems to complicate things for no clear reason. If every consumer method
call is being made directly on the Consumer returned from #getMainConsumer,
it doesn't really matter if they constructed a different one. In fact we
could probably just skip having Streams construct its own consumer and
instead let the user construct it or provide an alternative implementation
instead. As I understand it, the only reason we want to create the
underlying consumer within Streams is to access an internal constructor
that puts it into "Streams" mode. But that could be solved in a different
way, for example if/when Streams invokes a Streams-specific method like
#setProcessId this automatically puts the consumer into Streams mode. Not
sure if this works since I don't have the full picture with details on this
internal constructor, just feels like that doesn't have to be the only
possible way to do things (not necessarily better ways, just different)

On a somewhat related note, I had a question about the deprecation plan. As
I understand it, the stated goal of KIP-1071 is to eventually deprecate and
then remove  support for the current rebalancing protocol, but the timeline
is undefined and nothing is being deprecated right now. If this KIP is
going to immediately deprecate the old KafkaClientSupplier, then we need to
support implementing an interceptor while configured to use the current
rebalancing protocol. Is this reading of the KIP correct? That is, can we
assume that the KafkaConsumer passed into #getMainConsumer will only be
constructed via the internal constructor/put into "Streams group mode" if
the app is configured to use the new protocol, and otherwise it will just
pass in a regular KafkaConsumer? Just want to make sure my understanding is
correct here

On Fri, Sep 6, 2024 at 8:29 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

Sorry for the typo:
contracts = contradicts*

On Fri, Sep 6, 2024 at 5:20 PM Alieh Saeedi <asae...@confluent.io> wrote:

Thanks Matthias for the KIP.

A quick question regarding the sentence `It would be invalid to create a
new client instance `: What would happen if the implemented class creates
a new instance, or, in other words, how do we prevent it? Considering
that
`Config` is going to be passed in as well (the 2nd point raised by
Sophie),
Also, the new Consumer object (`mockConsumer`) you created in your
example
here <https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8>
confused me since it contracts with the above sentence.

Thanks,
Alieh

On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

1. Fair enough. In the end it doesn't matter that much to us, so I just
figured from basic principles if this is a config then it should go in
the
StreamsConfig. Also happy to hear what others think

2. We need the client-specific configs (for example to extract the
client
ids for monitoring, resource management, etc). However, I checked our
implementation and realized it's not enough to just pass them in -- we
actually want to modify some configs on the fly, before they are used to
construct the client. If absolutely necessary I'm sure we could find a
workaround for this by overriding the configs in the original
StreamsConfig
via the client prefixes, but that would be kind of a hassle so I'd be
happy
if we found a way to maintain the read-write mode for client configs as
well

3. Perhaps we do need a PR -- not for the feature implementation itself,
but an example of how this would be used. As I understand it, the whole
point is to be able to cast the client to KafkaConsumer (or some
consumer
class?) to access its internal methods. What is the specific plan for
how
this will work with the interceptors proposed in this KIP?

I was imagining the idea was that Streams would retain a reference to
the
underlying KafkaConsumer (ie what's passed in to the interceptor
callback)
and it would be invoking those internal methods on that KafkaConsumer,
while all the public methods are to be invoked on the consumer returned
by
the interceptor. But as long as some internal methods are being invoked
on
a consumer then there's a part of Streams that is inherently dependent
on
having a valid KafkaConsumer implementation, which is dependent on
having
a
valid cluster it can connect to...no?

I really think it would help to at least outline what these internal
methods are. Right now I'm assuming they are going to result in the
consumer sending an RPC or taking some other action that would be
problematic if there was no valid cluster, but maybe that's not the
case?
Don't want to waste your time because my assumptions are wrong, so some
specifics would help.  :)

On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the input Sophie.

About (1) -- I am personally happy either way. Let's wait what others
prefer (constructor only, config only, both), and I can update the KIP
later. Does does seem to be a core issue for the KIP.


For (2), I guess we can do this, and I am open to add it. Wondering if
you would need to get access to the client specific config KS
computes,
or if we should let `KafkaClientIntercptor implement Configurable` and
thus add a single `configure(Map)` call passing in the user's provided
config only once?


About (3), I don't follow your concern. As pointed out in the KIP
itself, we cannot enforce that no MockClient is returned, and for a
mock
case, one would not have a broker cluster and not have a fully
fletched
KS runtime, thus, just switching out the client should still work as
before (to be fair: I don't have full PR for this yet -- if we think
it
would be helpful, I can work more on it an push to GitHub so we get a
clearer picture):

MyInterceptor implement KafkaClientInterceptor {
    MockConsumer mockConsumer = new MockConsumer();

    public Consumer wrapConsumer(KafkaConsumer consumer) {
      // this would never make any call into `consumer` at all
      return mockConsumer;
    }
}

Note that the return type is `Consumer`, ie, the top level interface,
but not `KafkaConsumer` and thus you can return anything you want.

Similarly, you can intercept any call transparently:

MyInterceptor implement KafkaClientInterceptor {

    public Consumer wrapConsumer(KafkaConsumer consumer) {
      return new Consumer {
        @Overrride
        public ConsumerRecords<K, V> poll(final long timeoutMs) {
          // add 500ms to the timeout, and forward the call
          ConsumerRecords r1 = consumer.poll(timeoutMs + 500);

          // apply some filtering to modify the result
          Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new ...;
          for(ConsumerRecord r : r1) {
            if (...) {
              r2.put(r1.partition, r1....)
            }
          }
          return new ConsumerRecords(r2);
        }

        @Overwrite
        public Map<TopicPartition, Long>
endOffsets(Collection<TopicPartition> partitions) {

           // don't forward call at all

           Map offsets = new HashMap<>;
           offset.put(...); // add anything you want

           return offsets;
         }
      };
    }
}

Of course, for a real deployment, one must be careful what to
intercept,
forward, or not forward into the actual client, but you folks know
what
you are doing to I am not worried about it. In general, yes, if calls
are intercepted incorrectly, one could break Kafka Streams, but this
is
true right now, too, so I don't think anything really changes.

I guess, in the end, the new interface allows you to do everything you
did before, but we still change the API contract a little bit, as
Kafka
Streams provides a client instance now.


Does this help?


-Matthias




On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote:
I have one more thing to add/emphasize around point 3)  -- I should
clarify
that the need to modify return values and skip delegated calls are
essential to our own client wrappers. In other words, this isn't
something
specific to mock/test clients. Just wanted to point that out so I
didn't
accidentally cause you to waste time looking for a workaround for
testing
specifically, I was just using mock/test clients as an example case.

For a specific example, we do some things around intercepting seek
calls
in
order to set offsets correctly for our remote stores, such as
overriding
the #seekToBeginning so it instead seeks to specific offsets. This
isn't
the only thing but I think it showcases clearly how performing the
call
being intercepted (in this case a seekToBeginning) would end up
completely
undoing the interceptor's actions (the seek to a specific offset).

Hope this makes sense! Thanks

On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Thanks Matthias!

1. Imo it makes more sense for the new client interceptor to be
configurable via config and not by KafkaStreams constructor. Let's
take
the
opportunity to reduce the API surface area of the already massively
overloaded KafkaStreams constructor and fix the inclusion of the
KafkaClientSupplier/Interceptor in the primary KafkaStreams
interface.
We
already started moving in this direction with KIP-884 which added
the
*default.client.supplier* config. Think it was just an oversight
that we
didn't deprecate the constructors in that same KIP (didn't see this
mentioned in the rejected alternatives section).

2. We need to continue passing in the config map to the
interceptors/wrappers. Happy to elaborate if need be but this is
absolutely
essential to us :)

3. I'm a bit confused about how injecting mock/test clients would
work
with these interceptors. If I understand the proposed API
correctly,
these
are simply transparent wrappers that don't allow one to skip the
delegated
call or mock the returned value. There are many users who plug in
mock
clients to unit test their code without spinning up a broker
(especially
since the EmbeddedKafkaCluster isn't even a public API) and verify
things
outside the bounds of the TTD. There are many examples of this in
Kafka
Streams itself -- perhaps you could show an example?

4. FYI I am reaching out about whether there are any true custom
client
alternatives out there or in the planning. So far I'm not aware of
any
and
we can/should proceed assuming there are none, but I'll update the
thread
if I learn about something new here.

On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org>
wrote:

Hi,

I would like to start a discussion on KIP-1088:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor

We consider this KIP a requirement/blocker for KIP-1071.

For the beginning of the discussion, I would like to stay focused
on
the
_how_ and not necessarily talk about names... I am happy to change
any
class/methods names, but it might be distracting in the beginning
of
the
discussion.

Looking forward to your feedback.


-Matthias








Reply via email to