I originally meant startNewChain(), but disableChaining() should work too.

Can you rerun the job with the logging level set to DEBUG, and check for any message from org.apache.flink.runtime.metrics?

Also looping in Robert, maybe he has an idea.

On 17.07.2017 14:23, Paolo Cristofanelli wrote:
Hi Chesnay,

thanks for your answer. I have not found the method createNewChain(), I used instead disableChaining(), but with no effect:

         DataStream<String> stream = env.addSource(

        new FlinkKafkaConsumer08<>(

        "MyTopic", new SimpleStringSchema(), properties) );


           stream.map( new ConsumerMap()).disableChaining();


           env.execute();



Best Regards,
Paolo

On 17 July 2017 at 13:10, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Hello,

    As for 1), my suspicion is that this is caused by chaining. If the
    map function is chained to the kafka source then the latency
    markers are always immediately forwarded, regardless of what your
    map function is doing.
    If the map function is indeed chained to the source, could you try
    again after disabling the chain by calling
    `X.map(...).createNewChain()` and report back?

    As for 2), I don't think this is possible right now.

    Regards,
    Chesnay


    On 17.07.2017 12:42, Paolo Cristofanelli wrote:

        Hi,

        I would like to understand how to measure the latency of a record.
        I have set up a simple project with a Kafka consumer that
        reads from a topic and performs a simple map (with a thread
        sleep inside).

        In order to measure the latency of this mapper I have added
        env.getConfig().setLatencyTrackingInterval(10);

        After that, I was planning to access the latency through the
        webUI interface but the related graph does not show any values.
        I do not understand why. I was thinking that I in the graph I
        should observe at least the sleep duration.

        I also have another question:

        I am using a count window, aggregating every 100 input records
        and then I perform a map. I want to see the latency as the
        difference between the time at which the output record is
        emitted and the arrival time of the earliest input record.

        For example, the first value arrives at x. After x +5 I all
        the 100 values arrived and the system can aggregate them. Now
        I perform the map operation and we emit the output record at
        time x+15.
        I would like to obtain 15 as latency.
        Do you have any suggestion on how to proceed?

        Thanks for your time,
        Paolo Cristofanelli





Reply via email to