Hello Jorge,

>From the TracingProcessor implementation it seems you want to track
per-processor processing latency, is that right? If this is the case you
can actually use the per-processor metrics which include latency sensors.

If you do want to track, for a certain record, what's the latency of
processing it, then you'd probably need the processor implementation in
your repo. In this case, though, I'd suggest to provide a
WrapperProcessorSupplier for the users than modifying
InternalStreamsTopology: more specifically, you can provide an
`abstract WrapperProcessorSupplier
implements ProcessorSupplier` and then let users to instantiate this class
instead of the "bare-metal" interface. WDYT?


Guozhang

On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks for your answer, Matthias!
>
> What I'm looking for is something similar to interceptors, but for Stream
> Processors.
>
> In Zipkin -and probably other tracing implementations as well- we are using
> Headers to propagate the context of a trace (i.e. adding metadata to the
> Kafka Record, so we can create references to a trace).
> Now that Headers are part of Kafka Streams Processor API, we can propagate
> context from input (Consumers) to outputs (Producers) by using
> `KafkaClientSupplier` (e.g. <
> https://github.com/openzipkin/brave/blob/master/
> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> TracingKafkaClientSupplier.java
> >).
>
> "Input to Output" traces could be enough for some use-cases, but we are
> looking for a more detailed trace -that could cover cases like side-effects
> (e.g. for each processor), where input/output and processors latencies can
> be recorded. This is why I have been looking for how to decorate the
> `ProcessorSupplier` and all the changes shown in the comparison. Here is a
> gist of how we are planning to decorate the `addProcessor` method:
> https://github.com/openzipkin/brave/compare/master...jeqo:
> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>
> Hope this makes a bit more sense now :)
>
> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> matth...@confluent.io>)
> escribió:
>
> > >> I'm experimenting on how to add tracing to Kafka Streams.
> >
> > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > internal class.
> >
> > However, the diff also shows
> >
> > > public Topology(final InternalTopologyBuilder internalTopologyBuilder)
> {
> >
> > This has two impacts: first, it modifies `Topology` what is part of
> > public API and would require a KIP. Second, it exposes
> > `InternalTopologyBuilder` as part of the public API -- something we
> > should not do.
> >
> > I am also not sure, why you want to do this (btw: also public API change
> > requiring a KIP). However, this should not be necessary.
> >
> > >     public StreamsBuilder(final Topology topology)  {
> >
> >
> > I think I am lacking some context what you try to achieve. Maybe you can
> > elaborate in the problem you try to solve?
> >
> >
> > -Matthias
> >
> > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > Hi everyone,
> > >
> > > I'm experimenting on how to add tracing to Kafka Streams.
> > >
> > > One option is to override and access
> > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > final,
> > > and builder is not exposed as part of `StreamsBuilder`:
> > >
> > > ```
> > > public class StreamsBuilder {
> > >
> > >     /** The actual topology that is constructed by this StreamsBuilder.
> > */
> > >     private final Topology topology = new Topology();
> > >
> > >     /** The topology's internal builder. */
> > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > topology.internalTopologyBuilder;
> > >
> > >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > > InternalStreamsBuilder(internalTopologyBuilder);
> > > ```
> > >
> > > The goal is that If `builder#addProcessor` is exposed, we could
> decorate
> > > every `ProcessorSupplier` and capture traces from it:
> > >
> > > ```
> > > @Override
> > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > String... predecessorNames) {
> > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> name,
> > > supplier), predecessorNames);
> > >   }
> > > ```
> > >
> > > Would it make sense to propose this as a change:
> > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> ?
> > or
> > > maybe there is a better way to do this?
> > > TopologyWrapper does something similar:
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/TopologyWrapper.java
> > >
> > > Thanks in advance for any help.
> > >
> > > Cheers,
> > > Jorge.
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to