I agree with Alieh, there is definitely some inconsistency in what we do or
don't want to support/allow. However imo we should lean into the more
flexible approach of allowing an alternative consumer to be constructed and
returned, rather than attempting to enforce the interceptor to strictly
wrap the provided consumer. Mainly because the ability to bypass the "real"
KafkaConsumer and mock/stub methods is pretty essential to the test/mock
consumer use cases. Personally I feel this is a very advanced feature and
we should let people who know what they're doing implement any kind of
consumer for this, and if they misuse the API and break things then it's on
them. I feel we can document things clearly enough so that it will be
difficult to shoot yourself in the foot by accident (plus if they do end up
breaking things eg by constructing a new Consumer and just returning that
instead, it will at least break quickly and obviously. Figuring out that
the problem was in your interceptor might be challenging though. Whatever
we end up doing, if there's a way for someone to implement it incorrectly
then it would be good to figure out how we can detect this and warn the
user. I'm not quite sure how much of a safety net we can construct here but
we could try to cover some of the obvious misssteps, for example by
asserting that the returned Consumer is not an instance of KafkaConsumer
unless it's the same exact KafkaConsumer object passed into the
#getMainConsumer call. Not sure if there are other checks we could do?

I do suspect this problem just goes away if we pivot to the StreamsConsumer
approach we've discussed a bit before. The real issue with constructing and
returning a different KafkaConsumer instead of wrapping the one passed in,
is that Streams will be invoking some methods on the user's Consumer while
invoking other methods on the original KafkaConsumer. Another reason that
splitting up the consumer into a public interface and a hidden internal one
seems to complicate things for no clear reason. If every consumer method
call is being made directly on the Consumer returned from #getMainConsumer,
it doesn't really matter if they constructed a different one. In fact we
could probably just skip having Streams construct its own consumer and
instead let the user construct it or provide an alternative implementation
instead. As I understand it, the only reason we want to create the
underlying consumer within Streams is to access an internal constructor
that puts it into "Streams" mode. But that could be solved in a different
way, for example if/when Streams invokes a Streams-specific method like
#setProcessId this automatically puts the consumer into Streams mode. Not
sure if this works since I don't have the full picture with details on this
internal constructor, just feels like that doesn't have to be the only
possible way to do things (not necessarily better ways, just different)

On a somewhat related note, I had a question about the deprecation plan. As
I understand it, the stated goal of KIP-1071 is to eventually deprecate and
then remove  support for the current rebalancing protocol, but the timeline
is undefined and nothing is being deprecated right now. If this KIP is
going to immediately deprecate the old KafkaClientSupplier, then we need to
support implementing an interceptor while configured to use the current
rebalancing protocol. Is this reading of the KIP correct? That is, can we
assume that the KafkaConsumer passed into #getMainConsumer will only be
constructed via the internal constructor/put into "Streams group mode" if
the app is configured to use the new protocol, and otherwise it will just
pass in a regular KafkaConsumer? Just want to make sure my understanding is
correct here

On Fri, Sep 6, 2024 at 8:29 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

> Sorry for the typo:
> contracts = contradicts*
>
> On Fri, Sep 6, 2024 at 5:20 PM Alieh Saeedi <asae...@confluent.io> wrote:
>
> > Thanks Matthias for the KIP.
> >
> > A quick question regarding the sentence `It would be invalid to create a
> > new client instance `: What would happen if the implemented class creates
> > a new instance, or, in other words, how do we prevent it? Considering
> that
> > `Config` is going to be passed in as well (the 2nd point raised by
> Sophie),
> > Also, the new Consumer object (`mockConsumer`) you created in your
> example
> > here <https://lists.apache.org/thread/l6dhq1rfl3xkq8g9wfqsvw89yjrgzbn8>
> > confused me since it contracts with the above sentence.
> >
> > Thanks,
> > Alieh
> >
> > On Fri, Sep 6, 2024 at 5:08 AM Sophie Blee-Goldman <
> sop...@responsive.dev>
> > wrote:
> >
> >> 1. Fair enough. In the end it doesn't matter that much to us, so I just
> >> figured from basic principles if this is a config then it should go in
> the
> >> StreamsConfig. Also happy to hear what others think
> >>
> >> 2. We need the client-specific configs (for example to extract the
> client
> >> ids for monitoring, resource management, etc). However, I checked our
> >> implementation and realized it's not enough to just pass them in -- we
> >> actually want to modify some configs on the fly, before they are used to
> >> construct the client. If absolutely necessary I'm sure we could find a
> >> workaround for this by overriding the configs in the original
> >> StreamsConfig
> >> via the client prefixes, but that would be kind of a hassle so I'd be
> >> happy
> >> if we found a way to maintain the read-write mode for client configs as
> >> well
> >>
> >> 3. Perhaps we do need a PR -- not for the feature implementation itself,
> >> but an example of how this would be used. As I understand it, the whole
> >> point is to be able to cast the client to KafkaConsumer (or some
> consumer
> >> class?) to access its internal methods. What is the specific plan for
> how
> >> this will work with the interceptors proposed in this KIP?
> >>
> >> I was imagining the idea was that Streams would retain a reference to
> the
> >> underlying KafkaConsumer (ie what's passed in to the interceptor
> callback)
> >> and it would be invoking those internal methods on that KafkaConsumer,
> >> while all the public methods are to be invoked on the consumer returned
> by
> >> the interceptor. But as long as some internal methods are being invoked
> on
> >> a consumer then there's a part of Streams that is inherently dependent
> on
> >> having a valid KafkaConsumer implementation, which is dependent on
> having
> >> a
> >> valid cluster it can connect to...no?
> >>
> >> I really think it would help to at least outline what these internal
> >> methods are. Right now I'm assuming they are going to result in the
> >> consumer sending an RPC or taking some other action that would be
> >> problematic if there was no valid cluster, but maybe that's not the
> case?
> >> Don't want to waste your time because my assumptions are wrong, so some
> >> specifics would help.  :)
> >>
> >> On Thu, Sep 5, 2024 at 4:15 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >> > Thanks for the input Sophie.
> >> >
> >> > About (1) -- I am personally happy either way. Let's wait what others
> >> > prefer (constructor only, config only, both), and I can update the KIP
> >> > later. Does does seem to be a core issue for the KIP.
> >> >
> >> >
> >> > For (2), I guess we can do this, and I am open to add it. Wondering if
> >> > you would need to get access to the client specific config KS
> computes,
> >> > or if we should let `KafkaClientIntercptor implement Configurable` and
> >> > thus add a single `configure(Map)` call passing in the user's provided
> >> > config only once?
> >> >
> >> >
> >> > About (3), I don't follow your concern. As pointed out in the KIP
> >> > itself, we cannot enforce that no MockClient is returned, and for a
> mock
> >> > case, one would not have a broker cluster and not have a fully
> fletched
> >> > KS runtime, thus, just switching out the client should still work as
> >> > before (to be fair: I don't have full PR for this yet -- if we think
> it
> >> > would be helpful, I can work more on it an push to GitHub so we get a
> >> > clearer picture):
> >> >
> >> > MyInterceptor implement KafkaClientInterceptor {
> >> >    MockConsumer mockConsumer = new MockConsumer();
> >> >
> >> >    public Consumer wrapConsumer(KafkaConsumer consumer) {
> >> >      // this would never make any call into `consumer` at all
> >> >      return mockConsumer;
> >> >    }
> >> > }
> >> >
> >> > Note that the return type is `Consumer`, ie, the top level interface,
> >> > but not `KafkaConsumer` and thus you can return anything you want.
> >> >
> >> > Similarly, you can intercept any call transparently:
> >> >
> >> > MyInterceptor implement KafkaClientInterceptor {
> >> >
> >> >    public Consumer wrapConsumer(KafkaConsumer consumer) {
> >> >      return new Consumer {
> >> >        @Overrride
> >> >        public ConsumerRecords<K, V> poll(final long timeoutMs) {
> >> >          // add 500ms to the timeout, and forward the call
> >> >          ConsumerRecords r1 = consumer.poll(timeoutMs + 500);
> >> >
> >> >          // apply some filtering to modify the result
> >> >          Map<TopicPartition, List<ConsumerRecord<K, V>>> r2 = new ...;
> >> >          for(ConsumerRecord r : r1) {
> >> >            if (...) {
> >> >              r2.put(r1.partition, r1....)
> >> >            }
> >> >          }
> >> >          return new ConsumerRecords(r2);
> >> >        }
> >> >
> >> >        @Overwrite
> >> >        public Map<TopicPartition, Long>
> >> > endOffsets(Collection<TopicPartition> partitions) {
> >> >
> >> >           // don't forward call at all
> >> >
> >> >           Map offsets = new HashMap<>;
> >> >           offset.put(...); // add anything you want
> >> >
> >> >           return offsets;
> >> >         }
> >> >      };
> >> >    }
> >> > }
> >> >
> >> > Of course, for a real deployment, one must be careful what to
> intercept,
> >> > forward, or not forward into the actual client, but you folks know
> what
> >> > you are doing to I am not worried about it. In general, yes, if calls
> >> > are intercepted incorrectly, one could break Kafka Streams, but this
> is
> >> > true right now, too, so I don't think anything really changes.
> >> >
> >> > I guess, in the end, the new interface allows you to do everything you
> >> > did before, but we still change the API contract a little bit, as
> Kafka
> >> > Streams provides a client instance now.
> >> >
> >> >
> >> > Does this help?
> >> >
> >> >
> >> > -Matthias
> >> >
> >> >
> >> >
> >> >
> >> > On 9/5/24 1:58 PM, Sophie Blee-Goldman wrote:
> >> > > I have one more thing to add/emphasize around point 3)  -- I should
> >> > clarify
> >> > > that the need to modify return values and skip delegated calls are
> >> > > essential to our own client wrappers. In other words, this isn't
> >> > something
> >> > > specific to mock/test clients. Just wanted to point that out so I
> >> didn't
> >> > > accidentally cause you to waste time looking for a workaround for
> >> testing
> >> > > specifically, I was just using mock/test clients as an example case.
> >> > >
> >> > > For a specific example, we do some things around intercepting seek
> >> calls
> >> > in
> >> > > order to set offsets correctly for our remote stores, such as
> >> overriding
> >> > > the #seekToBeginning so it instead seeks to specific offsets. This
> >> isn't
> >> > > the only thing but I think it showcases clearly how performing the
> >> call
> >> > > being intercepted (in this case a seekToBeginning) would end up
> >> > completely
> >> > > undoing the interceptor's actions (the seek to a specific offset).
> >> > >
> >> > > Hope this makes sense! Thanks
> >> > >
> >> > > On Thu, Sep 5, 2024 at 1:18 PM Sophie Blee-Goldman <
> >> > sop...@responsive.dev>
> >> > > wrote:
> >> > >
> >> > >> Thanks Matthias!
> >> > >>
> >> > >> 1. Imo it makes more sense for the new client interceptor to be
> >> > >> configurable via config and not by KafkaStreams constructor. Let's
> >> take
> >> > the
> >> > >> opportunity to reduce the API surface area of the already massively
> >> > >> overloaded KafkaStreams constructor and fix the inclusion of the
> >> > >> KafkaClientSupplier/Interceptor in the primary KafkaStreams
> >> interface.
> >> > We
> >> > >> already started moving in this direction with KIP-884 which added
> the
> >> > >> *default.client.supplier* config. Think it was just an oversight
> >> that we
> >> > >> didn't deprecate the constructors in that same KIP (didn't see this
> >> > >> mentioned in the rejected alternatives section).
> >> > >>
> >> > >> 2. We need to continue passing in the config map to the
> >> > >> interceptors/wrappers. Happy to elaborate if need be but this is
> >> > absolutely
> >> > >> essential to us :)
> >> > >>
> >> > >> 3. I'm a bit confused about how injecting mock/test clients would
> >> work
> >> > >> with these interceptors. If I understand the proposed API
> correctly,
> >> > these
> >> > >> are simply transparent wrappers that don't allow one to skip the
> >> > delegated
> >> > >> call or mock the returned value. There are many users who plug in
> >> mock
> >> > >> clients to unit test their code without spinning up a broker
> >> (especially
> >> > >> since the EmbeddedKafkaCluster isn't even a public API) and verify
> >> > things
> >> > >> outside the bounds of the TTD. There are many examples of this in
> >> Kafka
> >> > >> Streams itself -- perhaps you could show an example?
> >> > >>
> >> > >> 4. FYI I am reaching out about whether there are any true custom
> >> client
> >> > >> alternatives out there or in the planning. So far I'm not aware of
> >> any
> >> > and
> >> > >> we can/should proceed assuming there are none, but I'll update the
> >> > thread
> >> > >> if I learn about something new here.
> >> > >>
> >> > >> On Wed, Sep 4, 2024 at 5:59 PM Matthias J. Sax <mj...@apache.org>
> >> > wrote:
> >> > >>
> >> > >>> Hi,
> >> > >>>
> >> > >>> I would like to start a discussion on KIP-1088:
> >> > >>>
> >> > >>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1088%3A+Replace+KafkaClientSupplier+with+KafkaClientInterceptor
> >> > >>>
> >> > >>> We consider this KIP a requirement/blocker for KIP-1071.
> >> > >>>
> >> > >>> For the beginning of the discussion, I would like to stay focused
> on
> >> > the
> >> > >>> _how_ and not necessarily talk about names... I am happy to change
> >> any
> >> > >>> class/methods names, but it might be distracting in the beginning
> of
> >> > the
> >> > >>> discussion.
> >> > >>>
> >> > >>> Looking forward to your feedback.
> >> > >>>
> >> > >>>
> >> > >>> -Matthias
> >> > >>>
> >> > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to