Thanks, everyone!

@Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek`
processor runs on a potentially different thread, then passing the trace
between them could be challenging. It will also require users to add these
operators themselves, which could be too cumbersome to use.

@Guozhang and @John: I will first focus on creating the
`TracingProcessorSupplier` for instrumenting custom `Processors` and I will
keep the idea of a `ProcessorInterceptor` in the back of my head to see if
it make sense to propose a KIP for this.

Thanks again for your feedback!

Cheers,
Jorge.
El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>)
escribió:

> Jorge:
>
> I have a crazy idea off the top of my head.
>
> Would something as low-tech using KSteam.peek calls on either side of
> certain processors to record start and end times work?
>
> Thanks,
> Bill
>
> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Jorge:
> >
> > My suggestion was to let your users to implement on the
> > TracingProcessorSupplier
> > / TracingProcessor directly instead of the base-line ProcessorSupplier /
> > Processor. Would that work for you?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> > > Guozhang and John.
> > >
> > > @Guozhang:
> > >
> > > > I'd suggest to provide a
> > > > WrapperProcessorSupplier for the users than modifying
> > > > InternalStreamsTopology: more specifically, you can provide an
> > > > `abstract WrapperProcessorSupplier
> > > > implements ProcessorSupplier` and then let users to instantiate this
> > > class
> > > > instead of the "bare-metal" interface. WDYT?
> > >
> > > Yes, in the gist, I have a class implementing `ProcessorSupplier`:
> > >
> > > ```
> > > public class TracingProcessorSupplier<K, V> implements
> > ProcessorSupplier<K,
> > > V> {
> > >   final KafkaTracing kafkaTracing;
> > >   final String name;
> > >   final ProcessorSupplier<K, V> delegate;
> > >    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
> > >       String name, ProcessorSupplier<K, V> delegate) {
> > >     this.kafkaTracing = kafkaTracing;
> > >     this.name = name;
> > >     this.delegate = delegate;
> > >   }
> > >    @Override public Processor<K, V> get() {
> > >     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
> > >   }
> > > }
> > > ```
> > >
> > > My challenge is how to wrap Topology Processors created by
> > > `StreamsBuilder#build` to make this instrumentation easy to adopt by
> > Kafka
> > > Streams users.
> > >
> > > @John:
> > >
> > > > The diff you posted only contains the library-side changes, and it's
> > not
> > > > obvious how you would use this to insert the desired tracing code.
> > > > Perhaps you could provide a snippet demonstrating how you want to use
> > > this
> > > > change to enable tracing?
> > >
> > > My first approach was something like this:
> > >
> > > ```
> > > final StreamsBuilder builder = kafkaStreamsTracing.builder();
> > > ```
> > >
> > > Where `KafkaStreamsTracing#builder` looks like this:
> > >
> > > ```
> > >   public StreamsBuilder builder() {
> > >     return new StreamsBuilder(new Topology(new
> > > TracingInternalTopologyBuilder(kafkaTracing)));
> > >   }
> > > ```
> > >
> > > Then, once the builder creates a topology, `processors` will be wrapped
> > by
> > > `TracingProcessorSupplier` described above.
> > >
> > > Probably this approach is too naive but works as an initial proof of
> > > concept.
> > >
> > > > Off the top of my head, here are some other approaches you might
> > > evaluate:
> > > > * you mentioned interceptors. Perhaps we could create a
> > > > ProcessorInterceptor interface and add a config to set it.
> > >
> > > This sounds very interesting to me. Then we won't need to touch
> internal
> > > API's, and just provide some configs. One challenge here is how to
> define
> > > the hooks. In consumer/producer, lifecycle is clear,
> > `onConsumer`/`onSend`
> > > and then `onCommit`/`onAck` methods. For Stream processors, how this
> will
> > > look like? Maybe `beforeProcess(context, key, value)` and
> > > `afterProcess(context, key, value)`.
> > >
> > > > * perhaps we could simply build the tracing headers into Streams. Is
> > > there
> > > > a benefit to making it customizable?
> > >
> > > I don't understand this option completely. Do you mean something like
> > > KIP-159 (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 159%3A+Introducing+Rich+functions+to+Streams
> > > )?
> > > Headers available on StreamsDSL will allow users to create "custom"
> > traces,
> > > for instance:
> > >
> > > ```
> > > stream.map( (headers, k, v) -> {
> > >   Span span = kafkaTracing.nextSpan(headers).start();
> > >   doSomething(k, v);
> > >   span.finish();
> > > }
> > > ```
> > >
> > > but it won't be possible to instrument the existing processors exposed
> by
> > > DSL only by enabling headers on Streams DSL.
> > >
> > > If we can define a way to pass a `ProcessorSupplier` to be used by
> > > `StreamsBuilder#internalTopology` -not sure if via constructor or some
> > > other way- would be enough to support this use-case.
> > >
> > > > Also, as Matthias said, you would need to create a KIP to propose
> this
> > > > change, but of course we can continue this preliminary discussion
> until
> > > you
> > > > feel confident to create the KIP.
> > >
> > > Happy to do it once the approach is clearer.
> > >
> > > Cheers,
> > > Jorge.
> > >
> > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>)
> > > escribió:
> > >
> > > > If I understand the request, it's about tracking the latencies for a
> > > > specific record, not the aggregated latencies for each processor.
> > > >
> > > > Jorge,
> > > >
> > > > The diff you posted only contains the library-side changes, and it's
> > not
> > > > obvious how you would use this to insert the desired tracing code.
> > > > Perhaps you could provide a snippet demonstrating how you want to use
> > > this
> > > > change to enable tracing?
> > > >
> > > > Also, as Matthias said, you would need to create a KIP to propose
> this
> > > > change, but of course we can continue this preliminary discussion
> until
> > > you
> > > > feel confident to create the KIP.
> > > >
> > > > Off the top of my head, here are some other approaches you might
> > > evaluate:
> > > > * you mentioned interceptors. Perhaps we could create a
> > > > ProcessorInterceptor interface and add a config to set it.
> > > > * perhaps we could simply build the tracing headers into Streams. Is
> > > there
> > > > a benefit to making it customizable?
> > > >
> > > > Thanks for considering this problem!
> > > > -John
> > > >
> > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Jorge,
> > > > >
> > > > > From the TracingProcessor implementation it seems you want to track
> > > > > per-processor processing latency, is that right? If this is the
> case
> > > you
> > > > > can actually use the per-processor metrics which include latency
> > > sensors.
> > > > >
> > > > > If you do want to track, for a certain record, what's the latency
> of
> > > > > processing it, then you'd probably need the processor
> implementation
> > in
> > > > > your repo. In this case, though, I'd suggest to provide a
> > > > > WrapperProcessorSupplier for the users than modifying
> > > > > InternalStreamsTopology: more specifically, you can provide an
> > > > > `abstract WrapperProcessorSupplier
> > > > > implements ProcessorSupplier` and then let users to instantiate
> this
> > > > class
> > > > > instead of the "bare-metal" interface. WDYT?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jo...@gmail.com> wrote:
> > > > >
> > > > > > Thanks for your answer, Matthias!
> > > > > >
> > > > > > What I'm looking for is something similar to interceptors, but
> for
> > > > Stream
> > > > > > Processors.
> > > > > >
> > > > > > In Zipkin -and probably other tracing implementations as well- we
> > are
> > > > > using
> > > > > > Headers to propagate the context of a trace (i.e. adding metadata
> > to
> > > > the
> > > > > > Kafka Record, so we can create references to a trace).
> > > > > > Now that Headers are part of Kafka Streams Processor API, we can
> > > > > propagate
> > > > > > context from input (Consumers) to outputs (Producers) by using
> > > > > > `KafkaClientSupplier` (e.g. <
> > > > > > https://github.com/openzipkin/brave/blob/master/
> > > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > > > > TracingKafkaClientSupplier.java
> > > > > > >).
> > > > > >
> > > > > > "Input to Output" traces could be enough for some use-cases, but
> we
> > > are
> > > > > > looking for a more detailed trace -that could cover cases like
> > > > > side-effects
> > > > > > (e.g. for each processor), where input/output and processors
> > > latencies
> > > > > can
> > > > > > be recorded. This is why I have been looking for how to decorate
> > the
> > > > > > `ProcessorSupplier` and all the changes shown in the comparison.
> > Here
> > > > is
> > > > > a
> > > > > > gist of how we are planning to decorate the `addProcessor`
> method:
> > > > > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > > > > >
> > > > > > Hope this makes a bit more sense now :)
> > > > > >
> > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > > > > matth...@confluent.io>)
> > > > > > escribió:
> > > > > >
> > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > > > > >
> > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine
> > > removing
> > > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's
> an
> > > > > > > internal class.
> > > > > > >
> > > > > > > However, the diff also shows
> > > > > > >
> > > > > > > > public Topology(final InternalTopologyBuilder
> > > > > internalTopologyBuilder)
> > > > > > {
> > > > > > >
> > > > > > > This has two impacts: first, it modifies `Topology` what is
> part
> > of
> > > > > > > public API and would require a KIP. Second, it exposes
> > > > > > > `InternalTopologyBuilder` as part of the public API --
> something
> > we
> > > > > > > should not do.
> > > > > > >
> > > > > > > I am also not sure, why you want to do this (btw: also public
> API
> > > > > change
> > > > > > > requiring a KIP). However, this should not be necessary.
> > > > > > >
> > > > > > > >     public StreamsBuilder(final Topology topology)  {
> > > > > > >
> > > > > > >
> > > > > > > I think I am lacking some context what you try to achieve.
> Maybe
> > > you
> > > > > can
> > > > > > > elaborate in the problem you try to solve?
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > > > > >
> > > > > > > > One option is to override and access
> > > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method
> > > it is
> > > > > > > final,
> > > > > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > public class StreamsBuilder {
> > > > > > > >
> > > > > > > >     /** The actual topology that is constructed by this
> > > > > StreamsBuilder.
> > > > > > > */
> > > > > > > >     private final Topology topology = new Topology();
> > > > > > > >
> > > > > > > >     /** The topology's internal builder. */
> > > > > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > > > > topology.internalTopologyBuilder;
> > > > > > > >
> > > > > > > >     private final InternalStreamsBuilder
> > internalStreamsBuilder =
> > > > new
> > > > > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > > > > ```
> > > > > > > >
> > > > > > > > The goal is that If `builder#addProcessor` is exposed, we
> could
> > > > > > decorate
> > > > > > > > every `ProcessorSupplier` and capture traces from it:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > @Override
> > > > > > > >   public void addProcessor(String name, ProcessorSupplier
> > > supplier,
> > > > > > > > String... predecessorNames) {
> > > > > > > >     super.addProcessor(name, new TracingProcessorSupplier(
> > > tracer,
> > > > > > name,
> > > > > > > > supplier), predecessorNames);
> > > > > > > >   }
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Would it make sense to propose this as a change:
> > > > > > > >
> > > > >
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > > > > ?
> > > > > > > or
> > > > > > > > maybe there is a better way to do this?
> > > > > > > > TopologyWrapper does something similar:
> > > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > > > > >
> > > > > > > > Thanks in advance for any help.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Jorge.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to