Hi Piotr,

Sorry for the late reply and thanks for the proposal, it looks awesome!

In the discussion, you pointed out that it is difficult to build true
distributed traces. afaiu from FLIP-384 and FLIP-385, the
upcoming OpenTelemetry based TraceReporter will use the same Span
implementation and will not support trace_id and span_id. Does it make
sense to at least add the span_id into the current Span design? The default
implementation could follow your suggestion: jobId#attemptId#checkpointId.

Some other NIT questions:
1. The sample code shows that the scope of Span will be the CanonicalName
of a class. If there are other cases that could be used as the scope too, a
javadoc about Span scope would be helpful. If the CanonicalName of a class
is the best practice, removing the scope from the builder constructor and
adding setScope(Class) might ease the API usage. The Span.getScope() can
still return String.
2. The sample code in the FLIP is not consistent. The first example used
Span.builder(..) and the second example used new Span() with setters.
3. I guess the constructor of SpanBuilder class is a typo.

Really a nice idea to introduce the trace report! Thanks again!

Best regards,
Jing

On Tue, Nov 14, 2023 at 3:16 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi All,
>
> Thanks for the answers!
>
> Unless there are some objections or suggestions, I will open a voting
> thread later this
> week.
>
> > My original thought was to show how much time a sampled record is
> processed
> > within each operator in stream processing. By saying 'sampled', I mean we
> > won't generate a trace for every record due to the high cost involved.
> > Instead, we could only trace ONE record from source when the user
> requests
> > it (via REST API or Web UI) or when triggered periodically at a very low
> > frequency.
>
> That would be useful, but another issue is that we can not measure time
> reliably at the
> granularity of a single record. Time to process a single record by the
> whole operator
> chain is usually faster compared to the syscalls to measure time.
>
> So I think we are stuck with sample based profilers, like Flame Graphs
> generated by
> the Flink WebUI.
>
> Best, Piotrek
>
> czw., 9 lis 2023 o 05:32 Rui Fan <1996fan...@gmail.com> napisał(a):
>
> > Hi Piotr:
> >
> > Thanks for your reply!
> >
> > > About structured logging (basically events?) I vaguely remember some
> > > discussions about that. It might be a much larger topic, so I would
> > prefer
> > > to leave it out of the scope of this FLIP.
> >
> > Sounds make sense to me!
> >
> > > I think those could be indeed useful. If you would like to contribute
> to
> > them
> > > in the future, I would be happy to review the FLIP for it :)
> >
> > Thank you, after this FLIP, I or my colleagues can pick it up!
> >
> > Best,
> > Rui
> >
> > On Thu, Nov 9, 2023 at 11:39 AM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for your detailed explanation! I could see the challenge of
> > > implementing traces with multiple spans and agree to put it in the
> future
> > > work. I personally prefer the idea of generating multi span traces for
> > > checkpoints on the JM only.
> > >
> > > > I'm not sure if I understand the proposal - I don't know how traces
> > could
> > > > be used for this purpose?
> > > > Traces are perfect for one of events (like checkpointing, recovery,
> > etc),
> > > > not for continuous monitoring
> > > > (like processing records). That's what metrics are. Creating trace
> and
> > > > span(s) per each record would
> > > > be prohibitively expensive.
> > >
> > > My original thought was to show how much time a sampled record is
> > processed
> > > within each operator in stream processing. By saying 'sampled', I mean
> we
> > > won't generate a trace for every record due to the high cost involved.
> > > Instead, we could only trace ONE record from source when the user
> > requests
> > > it (via REST API or Web UI) or when triggered periodically at a very
> low
> > > frequency. However after re-thinking my idea, I realized it's hard to
> > > define the whole lifecycle of a record since it is transformed into
> > > different forms among operators. We could discuss this in future after
> > the
> > > basic trace is implemented in Flink.
> > >
> > > > Unless you mean in batch/bounded jobs? Then yes, we could create a
> > > bounded
> > > > job trace, with spans
> > > > for every stage/task/subtask.
> > >
> > > Oh yes, batch jobs could definitely leverage the trace.
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > > On Wed, Nov 8, 2023 at 9:18 PM Jinzhong Li <lijinzhong2...@gmail.com>
> > > wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for driving this proposal!   I strongly agree that the
> existing
> > > > metric APIs are not suitable for monitoring restore/checkpoint
> > behavior!
> > > >
> > > > I think the TM-level recovery/checkpointing traces are necessary in
> the
> > > > future. In our production environment, we sometimes encounter that
> job
> > > > recovery time is very long (30min+), due to several subTask heavy
> disk
> > > > traffic. The TM-level recovery trace is helpful for troubleshooting
> > such
> > > > issues.
> > > >
> > > > Best
> > > > Jinzhong
> > > >
> > > > On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski <pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > Thanks for the comments. Quick answer for both of your questions
> > would
> > > be
> > > > > that it probably should be
> > > > > left as a future work. For more detailed answers please take a look
> > > below
> > > > > :)
> > > > >
> > > > > > Does it mean the inclusion and subdivision relationships of spans
> > > > defined
> > > > > > by "parent_id" are not supported? I think it is a very necessary
> > > > feature
> > > > > > for the trace.
> > > > >
> > > > > Yes exactly, that is the current limitation. This could be solved
> > > somehow
> > > > > one way or another in the future.
> > > > >
> > > > > Support for reporting multi span traces all at once - for example
> > > > > `CheckpointStatsTracker` running JM,
> > > > > could upon checkpoint completion create in one place the whole
> > > structure
> > > > of
> > > > > parent spans, to have for
> > > > > example one span per each subtask. This would be a relatively easy
> > > follow
> > > > > up.
> > > > >
> > > > > However, if we would like to create true distributed traces, with
> > spans
> > > > > reported from many different
> > > > > components, potentially both on JM and TM, the problem is a bit
> > deeper.
> > > > The
> > > > > issue in that case is how
> > > > > to actually fill out `parrent_id` and `trace_id`? Passing some
> > context
> > > > > entity as a java object would be
> > > > > unfeasible. That would require too many changes in too many
> places. I
> > > > think
> > > > > the only realistic way
> > > > > to do it, would be to have a deterministic generator of `parten_id`
> > and
> > > > > `trace_id` values.
> > > > >
> > > > > For example we could create the parent trace/span of the checkpoint
> > on
> > > > JM,
> > > > > and set those ids to
> > > > > something like: `jobId#attemptId#checkpointId`. Each subtask then
> > could
> > > > > re-generate those ids
> > > > > and subtasks' checkpoint span would have an id of
> > > > > `jobId#attemptId#checkpointId#subTaskId`.
> > > > > Note that this is just an example, as most likely distributed spans
> > for
> > > > > checkpointing do not make
> > > > > sense, as we can generate them much easier on the JM anyway.
> > > > >
> > > > > > In addition to checkpoint and recovery, I believe the trace would
> > > also
> > > > be
> > > > > > valuable for performance tuning. If Flink can trace and visualize
> > the
> > > > > time
> > > > > > cost of each operator and stage for a sampled record, users would
> > be
> > > > able
> > > > > > to easily determine the end-to-end latency and identify
> performance
> > > > > issues
> > > > > > for optimization. Looking forward to seeing these in the future.
> > > > >
> > > > > I'm not sure if I understand the proposal - I don't know how traces
> > > could
> > > > > be used for this purpose?
> > > > > Traces are perfect for one of events (like checkpointing, recovery,
> > > etc),
> > > > > not for continuous monitoring
> > > > > (like processing records). That's what metrics are. Creating trace
> > and
> > > > > span(s) per each record would
> > > > > be prohibitively expensive.
> > > > >
> > > > > Unless you mean in batch/bounded jobs? Then yes, we could create a
> > > > bounded
> > > > > job trace, with spans
> > > > > for every stage/task/subtask.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > >
> > > > > śr., 8 lis 2023 o 05:30 Zakelly Lan <zakelly....@gmail.com>
> > > napisał(a):
> > > > >
> > > > > > Hi Piotr,
> > > > > >
> > > > > > Happy to see the trace! Thanks for this proposal.
> > > > > >
> > > > > > One minor question: It is mentioned in the interface of Span:
> > > > > >
> > > > > > Currently we don't support traces with multiple spans. Each span
> is
> > > > > > > self-contained and represents things like a checkpoint or
> > recovery.
> > > > > >
> > > > > >
> > > > > > Does it mean the inclusion and subdivision relationships of spans
> > > > defined
> > > > > > by "parent_id" are not supported? I think it is a very necessary
> > > > feature
> > > > > > for the trace.
> > > > > >
> > > > > > In addition to checkpoint and recovery, I believe the trace would
> > > also
> > > > be
> > > > > > valuable for performance tuning. If Flink can trace and visualize
> > the
> > > > > time
> > > > > > cost of each operator and stage for a sampled record, users would
> > be
> > > > able
> > > > > > to easily determine the end-to-end latency and identify
> performance
> > > > > issues
> > > > > > for optimization. Looking forward to seeing these in the future.
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 7, 2023 at 6:27 PM Piotr Nowojski <
> > pnowoj...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Rui,
> > > > > > >
> > > > > > > Thanks for the comments!
> > > > > > >
> > > > > > > > 1. I see the trace just supports Span? Does it support trace
> > > > events?
> > > > > > > > I'm not sure whether tracing events is reasonable for
> > > > TraceReporter.
> > > > > > > > If it supports, flink can report checkpoint and checkpoint
> path
> > > > > > > proactively.
> > > > > > > > Currently, checkpoint lists or the latest checkpoint can only
> > be
> > > > > > fetched
> > > > > > > > by external components or platforms. And report is more
> timely
> > > and
> > > > > > > > efficient than fetch.
> > > > > > >
> > > > > > > No, currently the `TraceReporter` that I'm introducing supports
> > > only
> > > > > > single
> > > > > > > span traces.
> > > > > > > So currently neither events on their own, nor events inside
> spans
> > > are
> > > > > not
> > > > > > > supported.
> > > > > > > This is done just for the sake of simplicity, and test out the
> > > basic
> > > > > > > functionality. But I think,
> > > > > > > those currently missing features should be added at some point
> in
> > > > > > > the future.
> > > > > > >
> > > > > > > About structured logging (basically events?) I vaguely remember
> > > some
> > > > > > > discussions about
> > > > > > > that. It might be a much larger topic, so I would prefer to
> leave
> > > it
> > > > > out
> > > > > > of
> > > > > > > the scope of this
> > > > > > > FLIP.
> > > > > > >
> > > > > > > > 2. This FLIP just monitors the checkpoint and task recovery,
> > > right?
> > > > > > >
> > > > > > > Yes, it only adds single span traces for checkpointing and
> > > > > > > recovery/initialisation - one
> > > > > > > span per whole job per either recovery/initialization process
> or
> > > per
> > > > > each
> > > > > > > checkpoint.
> > > > > > >
> > > > > > > > Could we add more operations in this FLIP? In our production,
> > we
> > > > > > > > added a lot of trace reporters for job starts and scheduler
> > > > > operation.
> > > > > > > > They are useful if some jobs start slowly, because they will
> > > affect
> > > > > > > > the job availability. For example:
> > > > > > > > - From JobManager process is started to JobGraph is created
> > > > > > > > - From JobGraph is created to JobMaster is created
> > > > > > > > - From JobMaster is created to job is running
> > > > > > > > - From start request tm from yarn or kubernetes to all tms
> are
> > > > ready
> > > > > > > > - etc
> > > > > > >
> > > > > > > I think those could be indeed useful. If you would like to
> > > contribute
> > > > > > them
> > > > > > > in the future,
> > > > > > > I would be happy to review the FLIP for it :)
> > > > > > >
> > > > > > > > Of course, this FLIP doesn't include them is fine for me. The
> > > first
> > > > > > > version
> > > > > > > > only initializes the interface and common operations, and we
> > can
> > > > add
> > > > > > > > more operations in the future
> > > > > > >
> > > > > > > Yes, that's exactly my thinking :)
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > > wt., 7 lis 2023 o 10:05 Rui Fan <1996fan...@gmail.com>
> > napisał(a):
> > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > Thanks for driving this proposal! The trace reporter is
> useful
> > to
> > > > > > > > check a lot of duration monitors inside of Flink.
> > > > > > > >
> > > > > > > > I have some questions about this proposal:
> > > > > > > >
> > > > > > > > 1. I see the trace just supports Span? Does it support trace
> > > > events?
> > > > > > > > I'm not sure whether tracing events is reasonable for
> > > > TraceReporter.
> > > > > > > > If it supports, flink can report checkpoint and checkpoint
> path
> > > > > > > > proactively.
> > > > > > > > Currently, checkpoint lists or the latest checkpoint can only
> > be
> > > > > > fetched
> > > > > > > > by external components or platforms. And report is more
> timely
> > > and
> > > > > > > > efficient than fetch.
> > > > > > > >
> > > > > > > > 2. This FLIP just monitors the checkpoint and task recovery,
> > > right?
> > > > > > > > Could we add more operations in this FLIP? In our production,
> > we
> > > > > > > > added a lot of trace reporters for job starts and scheduler
> > > > > operation.
> > > > > > > > They are useful if some jobs start slowly, because they will
> > > affect
> > > > > > > > the job availability. For example:
> > > > > > > > - From JobManager process is started to JobGraph is created
> > > > > > > > - From JobGraph is created to JobMaster is created
> > > > > > > > - From JobMaster is created to job is running
> > > > > > > > - From start request tm from yarn or kubernetes to all tms
> are
> > > > ready
> > > > > > > > - etc
> > > > > > > >
> > > > > > > > Of course, this FLIP doesn't include them is fine for me. The
> > > first
> > > > > > > version
> > > > > > > > only initializes the interface and common operations, and we
> > can
> > > > add
> > > > > > > > more operations in the future.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Rui
> > > > > > > >
> > > > > > > > On Tue, Nov 7, 2023 at 4:31 PM Piotr Nowojski <
> > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all!
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on FLIP-384: Introduce
> > > > > > TraceReporter
> > > > > > > > and
> > > > > > > > > use it to create checkpointing and recovery traces [1].
> > > > > > > > >
> > > > > > > > > This proposal intends to improve observability of Flink's
> > > > > > Checkpointing
> > > > > > > > and
> > > > > > > > > Recovery/Initialization operations, by adding support for
> > > > reporting
> > > > > > > > traces
> > > > > > > > > from Flink. In the future, reporting traces can be of
> course
> > > used
> > > > > for
> > > > > > > > other
> > > > > > > > > use cases and also by users.
> > > > > > > > >
> > > > > > > > > There are also two other follow up FLIPS, FLIP-385 [2] and
> > > > FLIP-386
> > > > > > > [3],
> > > > > > > > > which expand the basic functionality introduced in FLIP-384
> > > [1].
> > > > > > > > >
> > > > > > > > > Please let me know what you think!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Piotr Nowojski
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
> > > > > > > > > [3]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to