Hi Fabian,

Thanks a lot.
I got a better understanding.

> Operators are never GC'd (unless a job was cancelled)
That's great information.
Maybe, this is related to so called Managed Memory.
The document will be better if detail documents about Memory Management exists.

Thank you,
Yuta

On 2017/09/18 18:03, Fabian Hueske wrote:
Hi Yuta,

you got most things right :-)

3) sources (such as Kafka connectors) are also considered operators and start immediately because they are sources. 4) All other operators start when they need to process their first record. Operators are never GC'd (unless a job was cancelled), so the setup cost is a one time thing that only happens when the job is started.

Best, Fabian

2017-09-15 12:43 GMT+02:00 Yuta Morisawa <yu-moris...@kddi-research.jp <mailto:yu-moris...@kddi-research.jp>>:

    Hi Fabian,

    Thank you for your description.

    This is my understanding.
    1, At the exact time execute() method called, Flink creates
    JobGraph, submit it to JobManager, deploy tasks to TaskManagers and
    DOES NOT execute each operators.
    2, Operators are executed when they needed.
    3, Sources(kafka-connectors) starts before operators.
    4, The first time operators are called or after GC removes
    operators' instance, a kind of initialization occurs, such as
    classloading, instantiation, memory allocation and so on. It may
    costs much time.

    If there is any misunderstanding, please comment it.
    If not, my question is solved.

    Regards.
    Yuta

    On 2017/09/15 17:05, Fabian Hueske wrote:

        Hi Yuta,

        when the execute() method is called, the a so-called JobGraph is
        constructed from all operators that have been added before by
        calling map(), keyBy() and so on.
        The JobGraph is then submitted to the JobManager which is the
        master process in Flink. Based on the JobGraph, the master
        deploys tasks to the worker processes (TaskManagers).
        These are the tasks that do the actual processing and they are
        subsequently started as I explained before, i.e., the source
        task starts consuming from Kafka before subsequent tasks have
        been started.

        So, there is quite a lot happening when you call execute()
        including network communication and task deployment.

        Hope this helps,
        Fabian

        2017-09-15 4:25 GMT+02:00 Yuta Morisawa
        <yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>
        <mailto:yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>>>:


             Hi, Fabian

             > If I understand you correctly, the problem is only for
        the first events
             > that are processed.
             Yes. More Precisely, first 300 kafka-messages.

             > AFAIK, Flink lazily instantiates its operators which
        means that a source
             > task starts to consume records from Kafka before the
        subsequent tasks
             > have been started.
             That's a great indication. It describe well the affair.
             But, according to the document, it says "The operations are
        actually
             executed when the execution is explicitly triggered by an
        execute()
             call on the execution environment.".
             What does it mean?
             AFAIK, common Flink programs invoke execute() in main().
             Every operators start at this time? I think maybe no.

             - Flink Document

        
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#lazy-evaluation>>


             > Not sure if or what can be done about this behavior.
             > I'll loop in Till who knows more about the lifecycle of
        tasks.
             Thank you very much for your kindness.

             Regards, Yuta

             On 2017/09/14 19:32, Fabian Hueske wrote:

                 Hi,

                 If I understand you correctly, the problem is only for
        the first
                 events that are processed.

                 AFAIK, Flink lazily instantiates its operators which
        means that
                 a source task starts to consume records from Kafka
        before the
                 subsequent tasks have been started.
                 That's why the latency of the first records is higher.

                 Not sure if or what can be done about this behavior.
                 I'll loop in Till who knows more about the lifecycle of
        tasks.

                 Best, Fabian


                 2017-09-12 11:02 GMT+02:00 Yuta Morisawa
                 <yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>
                 <mailto:yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>>
                 <mailto:yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>
                 <mailto:yu-moris...@kddi-research.jp
        <mailto:yu-moris...@kddi-research.jp>>>>:

                      Hi,

                      I am worrying about the delay of the Streaming API.
                      My application is that it gets data from
        kafka-connectors and
                      process them, then push data to kafka-producers.
                      The problem is that the app suffers a long delay
        when the
                 first data
                      come in the cluster.
                      It takes about 1000ms to process data (I measure
        the time with
                      kafka-timestamp). On the other hand, it works well
        after
                 2-3 seconds
                      first data come in (the delay is about 200ms).

                      The application is so delay sensitive that I want
        to solve
                 this problem.
                      Now, I think this is a matter of JVM but I have no
        idea to
                      investigate it.
                      Is there any way to avoid this delay?



                      Thank you for your attention
                      Yuta




Reply via email to