For most operators yes. For sure, for all stateless operators like
`mapValues()` -- for stateful operators, it depends if data
repartitioning is required. If yes, the topology would be split into two
sub-topologies and thus, both `peek()` operations could run on different
threads.

If you want to double check, you can describe a topology before
executing via `Topology.describe()` -- all operators of a single
sub-topology will be executed single threaded.


-Matthias


On 9/26/18 12:14 AM, Jorge Esteban Quilcate Otoya wrote:
> Good to know, thanks Matthias!
> 
> You've mentioned a previous operator, but what about:
> `peek().mapValues().peek()`, will both `peek`s be in the same thread as
> well?
> 
> El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<matth...@confluent.io>)
> escribió:
> 
>> Just for clarification:
>>
>> `peek()` would run on the same thread and the previous operator. Even
>> if---strictly speaking---there is no public contract to guarantee this,
>> it would be the case in the current implementation, and I also don't see
>> any reason why this would change at any point in the future, because
>> it's the most efficient implementation I can think of.
>>
>> -Matthias
>>
>> On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
>>> Thanks, everyone!
>>>
>>> @Bill, the main issue with using `KStraem#peek()` is that AFAIK each
>> `peek`
>>> processor runs on a potentially different thread, then passing the trace
>>> between them could be challenging. It will also require users to add
>> these
>>> operators themselves, which could be too cumbersome to use.
>>>
>>> @Guozhang and @John: I will first focus on creating the
>>> `TracingProcessorSupplier` for instrumenting custom `Processors` and I
>> will
>>> keep the idea of a `ProcessorInterceptor` in the back of my head to see
>> if
>>> it make sense to propose a KIP for this.
>>>
>>> Thanks again for your feedback!
>>>
>>> Cheers,
>>> Jorge.
>>> El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>)
>>> escribió:
>>>
>>>> Jorge:
>>>>
>>>> I have a crazy idea off the top of my head.
>>>>
>>>> Would something as low-tech using KSteam.peek calls on either side of
>>>> certain processors to record start and end times work?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>
>>>>> Jorge:
>>>>>
>>>>> My suggestion was to let your users to implement on the
>>>>> TracingProcessorSupplier
>>>>> / TracingProcessor directly instead of the base-line ProcessorSupplier
>> /
>>>>> Processor. Would that work for you?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>
>>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
>>>>>> Guozhang and John.
>>>>>>
>>>>>> @Guozhang:
>>>>>>
>>>>>>> I'd suggest to provide a
>>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>>> `abstract WrapperProcessorSupplier
>>>>>>> implements ProcessorSupplier` and then let users to instantiate this
>>>>>> class
>>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>
>>>>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>>>>>>
>>>>>> ```
>>>>>> public class TracingProcessorSupplier<K, V> implements
>>>>> ProcessorSupplier<K,
>>>>>> V> {
>>>>>>   final KafkaTracing kafkaTracing;
>>>>>>   final String name;
>>>>>>   final ProcessorSupplier<K, V> delegate;
>>>>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>>>>>>       String name, ProcessorSupplier<K, V> delegate) {
>>>>>>     this.kafkaTracing = kafkaTracing;
>>>>>>     this.name = name;
>>>>>>     this.delegate = delegate;
>>>>>>   }
>>>>>>    @Override public Processor<K, V> get() {
>>>>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>>>>>>   }
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> My challenge is how to wrap Topology Processors created by
>>>>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
>>>>> Kafka
>>>>>> Streams users.
>>>>>>
>>>>>> @John:
>>>>>>
>>>>>>> The diff you posted only contains the library-side changes, and it's
>>>>> not
>>>>>>> obvious how you would use this to insert the desired tracing code.
>>>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>>>> this
>>>>>>> change to enable tracing?
>>>>>>
>>>>>> My first approach was something like this:
>>>>>>
>>>>>> ```
>>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
>>>>>> ```
>>>>>>
>>>>>> Where `KafkaStreamsTracing#builder` looks like this:
>>>>>>
>>>>>> ```
>>>>>>   public StreamsBuilder builder() {
>>>>>>     return new StreamsBuilder(new Topology(new
>>>>>> TracingInternalTopologyBuilder(kafkaTracing)));
>>>>>>   }
>>>>>> ```
>>>>>>
>>>>>> Then, once the builder creates a topology, `processors` will be
>> wrapped
>>>>> by
>>>>>> `TracingProcessorSupplier` described above.
>>>>>>
>>>>>> Probably this approach is too naive but works as an initial proof of
>>>>>> concept.
>>>>>>
>>>>>>> Off the top of my head, here are some other approaches you might
>>>>>> evaluate:
>>>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>>>
>>>>>> This sounds very interesting to me. Then we won't need to touch
>>>> internal
>>>>>> API's, and just provide some configs. One challenge here is how to
>>>> define
>>>>>> the hooks. In consumer/producer, lifecycle is clear,
>>>>> `onConsumer`/`onSend`
>>>>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
>>>> will
>>>>>> look like? Maybe `beforeProcess(context, key, value)` and
>>>>>> `afterProcess(context, key, value)`.
>>>>>>
>>>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>>>> there
>>>>>>> a benefit to making it customizable?
>>>>>>
>>>>>> I don't understand this option completely. Do you mean something like
>>>>>> KIP-159 (
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>> )?
>>>>>> Headers available on StreamsDSL will allow users to create "custom"
>>>>> traces,
>>>>>> for instance:
>>>>>>
>>>>>> ```
>>>>>> stream.map( (headers, k, v) -> {
>>>>>>   Span span = kafkaTracing.nextSpan(headers).start();
>>>>>>   doSomething(k, v);
>>>>>>   span.finish();
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> but it won't be possible to instrument the existing processors exposed
>>>> by
>>>>>> DSL only by enabling headers on Streams DSL.
>>>>>>
>>>>>> If we can define a way to pass a `ProcessorSupplier` to be used by
>>>>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
>>>>>> other way- would be enough to support this use-case.
>>>>>>
>>>>>>> Also, as Matthias said, you would need to create a KIP to propose
>>>> this
>>>>>>> change, but of course we can continue this preliminary discussion
>>>> until
>>>>>> you
>>>>>>> feel confident to create the KIP.
>>>>>>
>>>>>> Happy to do it once the approach is clearer.
>>>>>>
>>>>>> Cheers,
>>>>>> Jorge.
>>>>>>
>>>>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io
>>> )
>>>>>> escribió:
>>>>>>
>>>>>>> If I understand the request, it's about tracking the latencies for a
>>>>>>> specific record, not the aggregated latencies for each processor.
>>>>>>>
>>>>>>> Jorge,
>>>>>>>
>>>>>>> The diff you posted only contains the library-side changes, and it's
>>>>> not
>>>>>>> obvious how you would use this to insert the desired tracing code.
>>>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>>>> this
>>>>>>> change to enable tracing?
>>>>>>>
>>>>>>> Also, as Matthias said, you would need to create a KIP to propose
>>>> this
>>>>>>> change, but of course we can continue this preliminary discussion
>>>> until
>>>>>> you
>>>>>>> feel confident to create the KIP.
>>>>>>>
>>>>>>> Off the top of my head, here are some other approaches you might
>>>>>> evaluate:
>>>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>>>> there
>>>>>>> a benefit to making it customizable?
>>>>>>>
>>>>>>> Thanks for considering this problem!
>>>>>>> -John
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Jorge,
>>>>>>>>
>>>>>>>> From the TracingProcessor implementation it seems you want to track
>>>>>>>> per-processor processing latency, is that right? If this is the
>>>> case
>>>>>> you
>>>>>>>> can actually use the per-processor metrics which include latency
>>>>>> sensors.
>>>>>>>>
>>>>>>>> If you do want to track, for a certain record, what's the latency
>>>> of
>>>>>>>> processing it, then you'd probably need the processor
>>>> implementation
>>>>> in
>>>>>>>> your repo. In this case, though, I'd suggest to provide a
>>>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>>>> `abstract WrapperProcessorSupplier
>>>>>>>> implements ProcessorSupplier` and then let users to instantiate
>>>> this
>>>>>>> class
>>>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
>>>>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for your answer, Matthias!
>>>>>>>>>
>>>>>>>>> What I'm looking for is something similar to interceptors, but
>>>> for
>>>>>>> Stream
>>>>>>>>> Processors.
>>>>>>>>>
>>>>>>>>> In Zipkin -and probably other tracing implementations as well- we
>>>>> are
>>>>>>>> using
>>>>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
>>>>> to
>>>>>>> the
>>>>>>>>> Kafka Record, so we can create references to a trace).
>>>>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
>>>>>>>> propagate
>>>>>>>>> context from input (Consumers) to outputs (Producers) by using
>>>>>>>>> `KafkaClientSupplier` (e.g. <
>>>>>>>>> https://github.com/openzipkin/brave/blob/master/
>>>>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
>>>>>>>>> TracingKafkaClientSupplier.java
>>>>>>>>>> ).
>>>>>>>>>
>>>>>>>>> "Input to Output" traces could be enough for some use-cases, but
>>>> we
>>>>>> are
>>>>>>>>> looking for a more detailed trace -that could cover cases like
>>>>>>>> side-effects
>>>>>>>>> (e.g. for each processor), where input/output and processors
>>>>>> latencies
>>>>>>>> can
>>>>>>>>> be recorded. This is why I have been looking for how to decorate
>>>>> the
>>>>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
>>>>> Here
>>>>>>> is
>>>>>>>> a
>>>>>>>>> gist of how we are planning to decorate the `addProcessor`
>>>> method:
>>>>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
>>>>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>>>>>>>>>
>>>>>>>>> Hope this makes a bit more sense now :)
>>>>>>>>>
>>>>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
>>>>>>>>> matth...@confluent.io>)
>>>>>>>>> escribió:
>>>>>>>>>
>>>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>>
>>>>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
>>>>>> removing
>>>>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
>>>> an
>>>>>>>>>> internal class.
>>>>>>>>>>
>>>>>>>>>> However, the diff also shows
>>>>>>>>>>
>>>>>>>>>>> public Topology(final InternalTopologyBuilder
>>>>>>>> internalTopologyBuilder)
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>> This has two impacts: first, it modifies `Topology` what is
>>>> part
>>>>> of
>>>>>>>>>> public API and would require a KIP. Second, it exposes
>>>>>>>>>> `InternalTopologyBuilder` as part of the public API --
>>>> something
>>>>> we
>>>>>>>>>> should not do.
>>>>>>>>>>
>>>>>>>>>> I am also not sure, why you want to do this (btw: also public
>>>> API
>>>>>>>> change
>>>>>>>>>> requiring a KIP). However, this should not be necessary.
>>>>>>>>>>
>>>>>>>>>>>     public StreamsBuilder(final Topology topology)  {
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think I am lacking some context what you try to achieve.
>>>> Maybe
>>>>>> you
>>>>>>>> can
>>>>>>>>>> elaborate in the problem you try to solve?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>>>
>>>>>>>>>>> One option is to override and access
>>>>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
>>>>>> it is
>>>>>>>>>> final,
>>>>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> public class StreamsBuilder {
>>>>>>>>>>>
>>>>>>>>>>>     /** The actual topology that is constructed by this
>>>>>>>> StreamsBuilder.
>>>>>>>>>> */
>>>>>>>>>>>     private final Topology topology = new Topology();
>>>>>>>>>>>
>>>>>>>>>>>     /** The topology's internal builder. */
>>>>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
>>>>>>>>>>> topology.internalTopologyBuilder;
>>>>>>>>>>>
>>>>>>>>>>>     private final InternalStreamsBuilder
>>>>> internalStreamsBuilder =
>>>>>>> new
>>>>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
>>>> could
>>>>>>>>> decorate
>>>>>>>>>>> every `ProcessorSupplier` and capture traces from it:
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> @Override
>>>>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
>>>>>> supplier,
>>>>>>>>>>> String... predecessorNames) {
>>>>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
>>>>>> tracer,
>>>>>>>>> name,
>>>>>>>>>>> supplier), predecessorNames);
>>>>>>>>>>>   }
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Would it make sense to propose this as a change:
>>>>>>>>>>>
>>>>>>>>
>>>>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
>>>>>>>>> ?
>>>>>>>>>> or
>>>>>>>>>>> maybe there is a better way to do this?
>>>>>>>>>>> TopologyWrapper does something similar:
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for any help.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jorge.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to