final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
Guozhang and John.

@Guozhang:

> I'd suggest to provide a
> WrapperProcessorSupplier for the users than modifying
> InternalStreamsTopology: more specifically, you can provide an
> `abstract WrapperProcessorSupplier
> implements ProcessorSupplier` and then let users to instantiate this class
> instead of the "bare-metal" interface. WDYT?

Yes, in the gist, I have a class implementing `ProcessorSupplier`:

```
public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
  final KafkaTracing kafkaTracing;
  final String name;
  final ProcessorSupplier<K, V> delegate;
   public TracingProcessorSupplier(KafkaTracing kafkaTracing,
      String name, ProcessorSupplier<K, V> delegate) {
    this.kafkaTracing = kafkaTracing;
    this.name = name;
    this.delegate = delegate;
  }
   @Override public Processor<K, V> get() {
    return new TracingProcessor<>(kafkaTracing, name, delegate.get());
  }
}
```

My challenge is how to wrap Topology Processors created by
`StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka
Streams users.

@John:

> The diff you posted only contains the library-side changes, and it's not
> obvious how you would use this to insert the desired tracing code.
> Perhaps you could provide a snippet demonstrating how you want to use this
> change to enable tracing?

My first approach was something like this:

```
final StreamsBuilder builder = kafkaStreamsTracing.builder();
```

Where `KafkaStreamsTracing#builder` looks like this:

```
  public StreamsBuilder builder() {
    return new StreamsBuilder(new Topology(new
TracingInternalTopologyBuilder(kafkaTracing)));
  }
```

Then, once the builder creates a topology, `processors` will be wrapped by
`TracingProcessorSupplier` described above.

Probably this approach is too naive but works as an initial proof of
concept.

> Off the top of my head, here are some other approaches you might evaluate:
> * you mentioned interceptors. Perhaps we could create a
> ProcessorInterceptor interface and add a config to set it.

This sounds very interesting to me. Then we won't need to touch internal
API's, and just provide some configs. One challenge here is how to define
the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend`
and then `onCommit`/`onAck` methods. For Stream processors, how this will
look like? Maybe `beforeProcess(context, key, value)` and
`afterProcess(context, key, value)`.

> * perhaps we could simply build the tracing headers into Streams. Is there
> a benefit to making it customizable?

I don't understand this option completely. Do you mean something like
KIP-159 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
)?
Headers available on StreamsDSL will allow users to create "custom" traces,
for instance:

```
stream.map( (headers, k, v) -> {
  Span span = kafkaTracing.nextSpan(headers).start();
  doSomething(k, v);
  span.finish();
}
```

but it won't be possible to instrument the existing processors exposed by
DSL only by enabling headers on Streams DSL.

If we can define a way to pass a `ProcessorSupplier` to be used by
`StreamsBuilder#internalTopology` -not sure if via constructor or some
other way- would be enough to support this use-case.

> Also, as Matthias said, you would need to create a KIP to propose this
> change, but of course we can continue this preliminary discussion until
you
> feel confident to create the KIP.

Happy to do it once the approach is clearer.

Cheers,
Jorge.

El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>)
escribió:

> If I understand the request, it's about tracking the latencies for a
> specific record, not the aggregated latencies for each processor.
>
> Jorge,
>
> The diff you posted only contains the library-side changes, and it's not
> obvious how you would use this to insert the desired tracing code.
> Perhaps you could provide a snippet demonstrating how you want to use this
> change to enable tracing?
>
> Also, as Matthias said, you would need to create a KIP to propose this
> change, but of course we can continue this preliminary discussion until you
> feel confident to create the KIP.
>
> Off the top of my head, here are some other approaches you might evaluate:
> * you mentioned interceptors. Perhaps we could create a
> ProcessorInterceptor interface and add a config to set it.
> * perhaps we could simply build the tracing headers into Streams. Is there
> a benefit to making it customizable?
>
> Thanks for considering this problem!
> -John
>
> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Jorge,
> >
> > From the TracingProcessor implementation it seems you want to track
> > per-processor processing latency, is that right? If this is the case you
> > can actually use the per-processor metrics which include latency sensors.
> >
> > If you do want to track, for a certain record, what's the latency of
> > processing it, then you'd probably need the processor implementation in
> > your repo. In this case, though, I'd suggest to provide a
> > WrapperProcessorSupplier for the users than modifying
> > InternalStreamsTopology: more specifically, you can provide an
> > `abstract WrapperProcessorSupplier
> > implements ProcessorSupplier` and then let users to instantiate this
> class
> > instead of the "bare-metal" interface. WDYT?
> >
> >
> > Guozhang
> >
> > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Thanks for your answer, Matthias!
> > >
> > > What I'm looking for is something similar to interceptors, but for
> Stream
> > > Processors.
> > >
> > > In Zipkin -and probably other tracing implementations as well- we are
> > using
> > > Headers to propagate the context of a trace (i.e. adding metadata to
> the
> > > Kafka Record, so we can create references to a trace).
> > > Now that Headers are part of Kafka Streams Processor API, we can
> > propagate
> > > context from input (Consumers) to outputs (Producers) by using
> > > `KafkaClientSupplier` (e.g. <
> > > https://github.com/openzipkin/brave/blob/master/
> > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > > TracingKafkaClientSupplier.java
> > > >).
> > >
> > > "Input to Output" traces could be enough for some use-cases, but we are
> > > looking for a more detailed trace -that could cover cases like
> > side-effects
> > > (e.g. for each processor), where input/output and processors latencies
> > can
> > > be recorded. This is why I have been looking for how to decorate the
> > > `ProcessorSupplier` and all the changes shown in the comparison. Here
> is
> > a
> > > gist of how we are planning to decorate the `addProcessor` method:
> > > https://github.com/openzipkin/brave/compare/master...jeqo:
> > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> > >
> > > Hope this makes a bit more sense now :)
> > >
> > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > > matth...@confluent.io>)
> > > escribió:
> > >
> > > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > > >
> > > > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > > internal class.
> > > >
> > > > However, the diff also shows
> > > >
> > > > > public Topology(final InternalTopologyBuilder
> > internalTopologyBuilder)
> > > {
> > > >
> > > > This has two impacts: first, it modifies `Topology` what is part of
> > > > public API and would require a KIP. Second, it exposes
> > > > `InternalTopologyBuilder` as part of the public API -- something we
> > > > should not do.
> > > >
> > > > I am also not sure, why you want to do this (btw: also public API
> > change
> > > > requiring a KIP). However, this should not be necessary.
> > > >
> > > > >     public StreamsBuilder(final Topology topology)  {
> > > >
> > > >
> > > > I think I am lacking some context what you try to achieve. Maybe you
> > can
> > > > elaborate in the problem you try to solve?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Hi everyone,
> > > > >
> > > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > > >
> > > > > One option is to override and access
> > > > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > > > final,
> > > > > and builder is not exposed as part of `StreamsBuilder`:
> > > > >
> > > > > ```
> > > > > public class StreamsBuilder {
> > > > >
> > > > >     /** The actual topology that is constructed by this
> > StreamsBuilder.
> > > > */
> > > > >     private final Topology topology = new Topology();
> > > > >
> > > > >     /** The topology's internal builder. */
> > > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > > topology.internalTopologyBuilder;
> > > > >
> > > > >     private final InternalStreamsBuilder internalStreamsBuilder =
> new
> > > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > > ```
> > > > >
> > > > > The goal is that If `builder#addProcessor` is exposed, we could
> > > decorate
> > > > > every `ProcessorSupplier` and capture traces from it:
> > > > >
> > > > > ```
> > > > > @Override
> > > > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > > > String... predecessorNames) {
> > > > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> > > name,
> > > > > supplier), predecessorNames);
> > > > >   }
> > > > > ```
> > > > >
> > > > > Would it make sense to propose this as a change:
> > > > >
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > > ?
> > > > or
> > > > > maybe there is a better way to do this?
> > > > > TopologyWrapper does something similar:
> > > > >
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > > >
> > > > > Thanks in advance for any help.
> > > > >
> > > > > Cheers,
> > > > > Jorge.
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to