Hi everyone,
After an offline discussion with ZhuZhu, I have some comments on this
investigation.

Regarding the maximum parallelism went from 760 to 685, it may because
of that the tasks are not scheduled evenly. The result is inconsistent
in multiple experiments. So, this phenomenon would be irrelevant to
our changes.

I think what we really care about is the framesize for Akka(Should
user enlarge it after our change for the same job). The size of TDD
after serialization seems to be smaller after change. I don't know the
root reason of this phenomenon at the moment. The way I measure it is:
```
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(deployment);
oos.flush();
LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
```
Please correct me if I'm wrong.

I'll experiment with higher parallelism to see if there is any
regression regarding Akka framesize.

Regarding the TDD building time, the parallelism in my investigation
might be too small. Meanwhile, this time might be influence by many
factors. Thus, I'll
- experiment with higher parallelism.
- measure the time spent from "Starting scheduling" to the last task
change state to running.

Best,
Yangze Guo


On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <karma...@gmail.com> wrote:
>
> Hi there,
>
> Sorry for the belated reply. I just make a preliminary investigation
> of the infect of refactoring IntermediateResultPartitionID.
>
> The key change is making it being composed of IntermediateDataSetID
> and a partitionNum.
> public class IntermediateResultPartitionID {
>    private final IntermediateDataSetID intermediateDataSetID;
>    private final int partitionNum;
> }
>
> In this benchmark, I use examples/streaming/WordCount.jar as the test
> job and run Flink on Yarn. All the configurations are kept default
> except for "taskmanager.numberOfTaskSlots", which is set to 2.
>
> The result shows it does have an impact on performance.
> - After this change, the maximum parallelism went from 760 to 685,
> which limited by the total number of network buffers. For the same
> parallelism, user needs more network buffer than before.
> - The netty message "PartitionRequest" and "TaskEventRequest" increase
> by 4 bytes. For "PartitionRequest", it means 7% increase.
> - The TDD takes longer to construct. With 600 parallelisms, it takes
> twice as long to construct TDD than before.
>
> Details record in
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
>
> The same issue could happen in ExecutionAttemptID, which will increase
> the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> and attemptNumber). But it may not infect the TDD as much as
> IntermediateResultPartitionID, since there is only one
> ExecutionAttemptID per TDD.
>
> After that preliminary investigation, I tend to not refactor
> ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> treat it as future work.
>
> WDYT? @ZhuZhu @Till
>
> Best,
> Yangze Guo
>
> On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > >> However, it seems the JobVertexID is derived from hashcode ...
> > You are right. JobVertexID is widely used and reworking it may affect the
> > public
> > interfaces, e.g. REST api. We can take it as a long term goal and exclude
> > it from this FLIP.
> > This same applies to IntermediateDataSetID, which can be also composed of a
> > JobID
> > and an index as Till proposed.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Till Rohrmann <trohrm...@apache.org> 于2020年3月31日周二 下午8:36写道:
> >
> > > For the IntermediateDataSetID I was just thinking that it might actually 
> > > be
> > > interesting to know which job produced the result which, by using cluster
> > > partitions, could be used across different jobs. Not saying that we have 
> > > to
> > > do it, though.
> > >
> > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem 
> > > with
> > > too large TDDs there is already an issue [1]. The current suspicion is 
> > > that
> > > the size of TDDs for jobs with a large parallelism can indeed become
> > > problematic for Flink. Hence, it would be great to investigate the impacts
> > > of the proposed changes.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <karma...@gmail.com> wrote:
> > >
> > > > Hi, Zhu,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > > make JobVertexID a composition of JobID and a topology index
> > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > derived from hashcode which used to identify them across submission.
> > > > I'm not familiar with that component though. I prefer to keep this
> > > > idea out the scope of this FLIP if no one could help us to figure it
> > > > out.
> > > >
> > > > > How about we still keep IntermediateDataSetID independent from
> > > > JobVertexID,
> > > > > but just print the producing relationships in logs? I think keeping
> > > > > IntermediateDataSetID independent may be better considering the cross
> > > job
> > > > > result usages in interactive query cases.
> > > > I think you are right. I'll keep IntermediateDataSetID independent
> > > > from JobVertexID.
> > > >
> > > > > The new IDs will become larger with this rework.
> > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to
> > > > provide one during the implementation phase.
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > > >
> > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > > > > proposal. It can help a lot in troubleshooting.
> > > > >
> > > > > Here are a few questions for it:
> > > > > 1. Shall we make JobVertexID a composition of JobID and a topology
> > > index?
> > > > > This would help in the session cluster case, so that we can identify
> > > > which
> > > > > tasks are from which jobs along with the rework of ExecutionAttemptID.
> > > > >
> > > > > 2. You mentioned that "Add the producer info to the string literal of
> > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > > > > composition of JobVertexID and a consumer index?
> > > > > How about we still keep IntermediateDataSetID independent from
> > > > JobVertexID,
> > > > > but just print the producing relationships in logs? I think keeping
> > > > > IntermediateDataSetID independent may be better considering the cross
> > > job
> > > > > result usages in interactive query cases.
> > > > >
> > > > > 3. The new IDs will become larger with this rework. The
> > > > > TaskDeploymentDescriptor can become much larger since it is mainly
> > > > composed
> > > > > of a variety DIs. I'm not sure how much it would be but there can be
> > > more
> > > > > memory and CPU cost for it, and results in more frequent GCs, message
> > > > size
> > > > > exceeding akka frame limits, and a longer blocked time of main thread.
> > > > > This should not be a problem in most cases but might be a problem for
> > > > large
> > > > > scale jobs. Shall we have an benchmark for it?
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Yangze Guo <karma...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > >
> > > > > > Thank you all for the feedback! Sorry for the belated reply.
> > > > > >
> > > > > > @Till
> > > > > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > > > > scope of this FLIP since the pipelined region scheduling is an
> > > ongoing
> > > > > > work now.
> > > > > > I also agree that we should not make the InstanceID in
> > > > > > TaskExecutorConnection being composed of the ResourceID plus a
> > > > > > monotonically increasing value. Thanks a lot for your explanation.
> > > > > >
> > > > > > @Konstantin @Yang
> > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > > > > make TM respect it. User needs to guarantee there is no collision 
> > > > > > for
> > > > > > different TM.
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <stevenz...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > >
> > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <danrtsey...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Konstantin,
> > > > > > > >
> > > > > > > > I think it is a good idea. Currently, our users also report a
> > > > similar
> > > > > > issue
> > > > > > > > with
> > > > > > > > resourceId of standalone cluster. When we start a standalone
> > > > cluster
> > > > > > now,
> > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > resourceId. It
> > > > > > will
> > > > > > > > be used to register to the jobmanager and not convenient to 
> > > > > > > > match
> > > > with
> > > > > > the
> > > > > > > > real
> > > > > > > > taskmanager, especially in container environment.
> > > > > > > >
> > > > > > > > I think a probably solution is we could support the user defined
> > > > > > > > resourceId.
> > > > > > > > We could get it from the environment. For standalone on K8s, we
> > > > could
> > > > > > set
> > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> > > > match the
> > > > > > > > taskmanager with K8s pod.
> > > > > > > >
> > > > > > > > Moreover, i am afraid we could not set the pod name to the
> > > > resourceId.
> > > > > > I
> > > > > > > > think
> > > > > > > > you could set the "deployment.meta.name". Since the pod name is
> > > > > > generated
> > > > > > > > by
> > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > the
> > > > > > > > contrary, we
> > > > > > > > will set the resourceId to the pod name.
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yang
> > > > > > > >
> > > > > > > > Konstantin Knauf <konstan...@ververica.com> 于2020年3月29日周日
> > > > 下午8:06写道:
> > > > > > > >
> > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > >
> > > > > > > > > thanks you for working on this topic. I believe it will make
> > > > > > debugging
> > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > >
> > > > > > > > > I was wondering whether it would make sense to allow the user
> > > to
> > > > > > specify
> > > > > > > > > the Resource ID in standalone setups?  For example, many users
> > > > still
> > > > > > > > > implicitly use standalone clusters on Kubernetes (the native
> > > > support
> > > > > > is
> > > > > > > > > still experimental) and in these cases it would be interesting
> > > to
> > > > > > also
> > > > > > > > set
> > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Kosntantin
> > > > > > > > >
> > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > trohrm...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yangze,
> > > > > > > > > >
> > > > > > > > > > thanks for creating this FLIP. I think it is a very good
> > > > > > improvement
> > > > > > > > > > helping our users and ourselves understanding better what's
> > > > going
> > > > > > on in
> > > > > > > > > > Flink.
> > > > > > > > > >
> > > > > > > > > > Creating the ResourceIDs with host information/pod name is a
> > > > good
> > > > > > idea.
> > > > > > > > > >
> > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> > > > good
> > > > > > idea.
> > > > > > > > > >
> > > > > > > > > > The InstanceID is used for fencing purposes. I would not 
> > > > > > > > > > make
> > > > it a
> > > > > > > > > > composition of the ResourceID + a monotonically increasing
> > > > number.
> > > > > > The
> > > > > > > > > > problem is that in case of a RM failure the InstanceIDs 
> > > > > > > > > > would
> > > > start
> > > > > > > > from
> > > > > > > > > 0
> > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > >
> > > > > > > > > > Logging more information on how the different runtime IDs 
> > > > > > > > > > are
> > > > > > > > correlated
> > > > > > > > > is
> > > > > > > > > > also a good idea.
> > > > > > > > > >
> > > > > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > > > > >
> > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was 
> > > > > > > > > > a
> > > > > > separate
> > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case 
> > > > > > > > > > I
> > > > > > think we
> > > > > > > > > > could remove the SlotRequestID and replace it with the
> > > > > > AllocationID.
> > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task 
> > > > > > > > > > slots
> > > > one
> > > > > > could
> > > > > > > > > > derive them from the SlotRequestID used for requesting the
> > > > > > underlying
> > > > > > > > > > AllocatedSlot.
> > > > > > > > > >
> > > > > > > > > > Given that the slot sharing logic will most likely be
> > > reworked
> > > > > > with the
> > > > > > > > > > pipelined region scheduling, we might be able to resolve
> > > these
> > > > two
> > > > > > > > points
> > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Till
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > karma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > We would like to start a discussion thread on "FLIP-118:
> > > > Improve
> > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > >
> > > > > > > > > > > This FLIP mainly discusses the following issues, target to
> > > > > > enhance
> > > > > > > > the
> > > > > > > > > > > readability of IDs in log and help user to debug in case 
> > > > > > > > > > > of
> > > > > > failures:
> > > > > > > > > > >
> > > > > > > > > > > - Enhance the readability of the string literals of IDs.
> > > > Most of
> > > > > > them
> > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not
> > > provide
> > > > much
> > > > > > > > > > > meaningful information and are hard to recognize and
> > > compare
> > > > for
> > > > > > > > > > > users.
> > > > > > > > > > > - Log the ID’s lineage information to make debugging more
> > > > > > convenient.
> > > > > > > > > > > Currently, the log fails to always show the lineage
> > > > information
> > > > > > > > > > > between IDs. Finding out relationships between entities
> > > > > > identified by
> > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > AllocationID is
> > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> > > > Absence
> > > > > > of
> > > > > > > > > > > such lineage information, it’s impossible to track the end
> > > > to end
> > > > > > > > > > > lifecycle of an Execution or a Task now, which makes
> > > > debugging
> > > > > > > > > > > difficult.
> > > > > > > > > > >
> > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > >
> > > > > > > > > > > - Add location information to distributed components
> > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > > > > >
> > > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > > Looking
> > > > > > > > forward
> > > > > > > > > > to
> > > > > > > > > > > your feedbacks.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yangze Guo
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > >
> > > > > > > > > +49 160 91394525
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/
> > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> > > > Flink
> > > > > > > > > Conference
> > > > > > > > >
> > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Ververica GmbH
> > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> > > > Jason,
> > > > > > Ji
> > > > > > > > > (Tony) Cheng
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > >

Reply via email to