Hi Andrey,

Thanks for providing so detailed concerns and enlightenments for this proposal. 
We exchanged our views of three main issues on google doc last week and it 
seems more appropriate to further contact here. :)

1. Configuration level for shuffle (cluster/job/operator)

- how do we share shuffle manager resources among different job tasks within 
one task executor process? It could be some static objects shared by all 
shuffle manager objects of some type but it might be not scalable approach. 
Example could be multiplexed netty connections (as I understand, current netty 
stack can become just custom shuffle service).

The creation of ShuffleManager instance on task level is just like the process 
of creating StateBackend in StateBackendLoader. The ShuffleService and 
ShuffleManager are two independent components, and the interaction between them 
is only registration mechanism. In detail, if some ShuffleManager instances 
want to rely ShuffleService to transport data, then it can register related 
infos to ShuffleService during creation of ResultPartitionWriter. So the 
ShuffleManager instance do not need  contain any objects like netty related 
stacks. The flink runtime can provide one unified netty-based ShuffleService 
which can be started in both internal TaskManager or external containers. The 
internal ShuffleService not only takes the role of tranporting data directly 
for some ShuffleManager instances but also takes the role of RPC server for 
communicating with external ShuffleService, such as register result partition 
to external service, otherwise the external service might need an additional 
RPC service to contact with TaskManager.  Here we have the implicit meaning to 
make intenral shuffle as a basic service started in TaskManager like the 
components of IOManager and MemoryManager, even thought useless for some type 
jobs.


- In case of having it per job, we might need to provide compatibility check 
between shuffle service and cluster mode (e.g. yarn ext shuffle service for 
standalone mode cluster) if it is an issue.

- Having it per job feels like the same complexity as having it per operator, 
at the first glance, just changes its granularity and where objects reside.
- what is the problem to use cluster per job mode? Then shuffle manager per 
cluster and per job is the same but might simplify other issues at the 
beginning. Streaming and batch jobs with different shuffle requirements could 
be started in different clusters per job.

I totally agree with the above concerns for per job configuration. As you 
mentioned, it is a option to run different type jobs in different clusters. But 
in some special scenarios like hybrid cluster to run online and offline jobs in 
differemt times, it is betterto support job level configuration for fexibility. 
Certainly it may not be a strong requirements for most cases, then we can reach 
an agreement to make the cluster level as the easiest way first and adjut the 
level if needed in future.

2. ShuffleManager interface

I think you mentioned three sub issues in this part:

2.1 Introduction of additional ResultPartitionWriterFactory && 
InputGateReaderFactory

I am not against the introduction of these two factories. The original 
introduction of pluggable ShuffleManager interface is for creating different 
writer and reader sides. If the ShuffleManager interface is used for creating 
factories, and then the factories are used for creating writer and reader. I 
still think the essence is same, and only the form is different.  That is the 
ShuffleManager concept is seen on JobManager side, and the task only sees the 
corresponding factories from ShuffleManager. In other words, we add another 
factory layer to distinguish between JobManager and task. The form might seem a 
bit better to introduce corresponding factories, so I am willing to take this 
way for implementation.

2.2 Whether to retain getResultPartitionLocation method in ShuffleManager 
interface

If I understand correctly, you mean to put this location as an argument in 
InputGateReaderFacotry constructor? If to do so, I think it makes sense and we 
can avoid have this explicit method in interface. But we also need to adjust 
the existing related process like updatePartitionInfo for downstream side. In 
this case, the partition location is unknown during deploying downstream tasks. 
Based on upstream's consumable notification, the location update is triggered 
by JobManager to downstream side.

2.3 ShuffleService interface

My initial thought is not making it as an interface. Because for internal or 
external shuffle cases, they can reuse the same unified netty-based shuffle 
service if we wrap the related componenets into current shuffle service well. 
If we want to furtherextend other implementations of shuffle service, like 
http-based shuffle service, then we can define an interface for it, the way as 
current RpcService interface to get ride of only akka implementations. So it 
also makes sense on my side to keep this interface. As for 
ShuffleServiceRegistry class, I agree with you to have this TaskManager level 
service for managing and sharing for all the internal tasks.

In summary, I think we do not have essential conflicts for above issues, almost 
for the implementation aspects. And I agree with the above points, especially 
for above 2.2 you might need double check if I understand correctly. 
Wish your further feedbacks then I can adjust the docs based on it.  Also 
welcome any other person's feedbacks!


Best,
Zhijiang



------------------------------------------------------------------
发件人:Andrey Zagrebin <and...@data-artisans.com>
发送时间:2018年12月10日(星期一) 05:18
收件人:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
<pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>; Till Rohrmann 
<trohrm...@apache.org>
主 题:Re: [DISCUSS] Proposal of external shuffle service

Hi Zhijiang,


Thanks for sharing the document Zhijiang. 
I decided to compile my thoughts to consider here, not to overload document 
comments any more :)
I think I still have question about job level configuration for the shuffle 
service. You mentioned that we can keep several shuffle manager objects in one 
task executor for different jobs. This is valid. My concerns are:
- how do we share shuffle manager resources among different job tasks within 
one task executor process? It could be some static objects shared by all 
shuffle manager objects of some type but it might be not scalable approach. 
Example could be multiplexed netty connections (as I understand, current netty 
stack can become just custom shuffle service).
- In case of having it per job, we might need to provide compatibility check 
between shuffle service and cluster mode (e.g. yarn ext shuffle service for 
standalone mode cluster) if it is an issue.
- Having it per job feels like the same complexity as having it per operator, 
at the first glance, just changes its granularity and where objects reside.
- what is the problem to use cluster per job mode? Then shuffle manager per 
cluster and per job is the same but might simplify other issues at the 
beginning. Streaming and batch jobs with different shuffle requirements could 
be started in different clusters per job. 
As for ShuffleManager interface, I think I see your point with the 
ResultPartitionLocation. I agree that partition needs some addressing of 
underlying connection or resources in general. It can be thinked of as an 
argument of ShuffleManager factory methods.
My point is that task code might not need to be coupled to shuffle interface. 
This way we could keep task code more independent of records transfer layer. We 
can always change later how shuffle/network service is organised internally 
without any consequences for the general task code. If task code calls just 
factories provided by JM, it might not even matter for the task in future 
whether it is configured per cluster, job or operator. Internally, factory can 
hold location of concrete type if needed.
Code example could be:
Job Manager side:
interface ShuffleManager {
  ResultPartionWriterFactory createResultPartionWriterFactory(job/task/topology 
descriptors);
  // similar for input gate factory
}
class ShuffleManagerImpl implements ShuffleManager {
  private general config, services etc;
  ResultPartionWriterFactory createResultPartionWriterFactory(job/task/topology 
descriptors) {
    return new ResultPartionWriterFactoryImpl(location, job, oper id, other 
specific config etc);
  }
  // similar for input gate factory
}
...
// somewhere in higher level code put ResultPartionWriterFactory into descriptor
Task executor side receives the factory inside the descriptor and calls 
factory.create(ShuffleServiceRegistry). Example of factory:
class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory {
  // all fields are lightweight and serialisable, received from JM
  private location, shuffle service id, other specific config etc;

 ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more generic 
args) {
    // get or create task local specific ShuffleServiceImpl by id in registry
    // ShuffleServiceImpl object can be shared between jobs
    // register with the ShuffleServiceImpl by location, id, config etc
  }
}
interface ShuffleService extends AutoClosable {
  getId();
}
ShuffleServiceImpl manages resources and decides internally whether to do it 
per task executor, task, job or operator. It can contain network stack, e,g, 
netty connections etc. In case of external service, it can hold partition 
manager, transport client etc. It is not enforced to have it per job by this 
contract or even to have it at all. ShuffleServiceImpl also does not need to 
depend on all TaskManagerServices, only create relevant inside, e.g. network.
class ShuffleServiceRegistry {
  <T extends ShuffleService> T getShuffleService(id);
 registerShuffleService(ShuffleService, id);
  deregisterShuffleService(id); // remove and close ShuffleService
  close(); // close all
}
ShuffleServiceRegistry is just a generic container of all available 
ShuffleService’s. It could be part of TaskManagerServices instead of 
NetworkEnvironment which could go into specific ShuffleServiceImpl.

I might still miss some details, I would appreciate any feedback.

Best,
Andrey

On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang...@aliyun.com.INVALID> wrote:
Hi all,

I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow 
down the scope of introducing pluggable shuffle manager architecture as the 
first step. 
Welcome further feedbacks and suggestions, then I would create specific 
subtasks for it to forward.

[1] https://issues.apache.org/jira/browse/FLINK-10653

[2] 
https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
------------------------------------------------------------------
发件人:zhijiang <wangzhijiang...@aliyun.com.INVALID>
发送时间:2018年11月1日(星期四) 17:19
收件人:dev <dev@flink.apache.org>; Jin Sun <isun...@gmail.com>
抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
<pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>
主 题:回复:[DISCUSS] Proposal of external shuffle service

Thanks for the efficient response till!

Thanks sunjin for the good feedbacks, we will further confirm with the comments 
then! :)
------------------------------------------------------------------
发件人:Jin Sun <isun...@gmail.com>
发送时间:2018年11月1日(星期四) 06:42
收件人:dev <dev@flink.apache.org>
抄 送:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Nico Kruber 
<n...@data-artisans.com>; Piotr Nowojski <pi...@data-artisans.com>; Stephan 
Ewen <se...@apache.org>
主 题:Re: [DISCUSS] Proposal of external shuffle service

Thanks Zhijiang for the proposal. I like the idea of external shuffle service, 
have left some comments on the document. 

On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrm...@apache.org> wrote:

Thanks for the update Zhijiang! The community is currently quite busy with
the next Flink release. I hope that we can finish the release in two weeks.
After that people will become more responsive again.

Cheers,
Till

On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang...@aliyun.com> wrote:

I already created the umbrella jira [1] for this improvement, and attched
the design doc [2] in this jira.

Welcome for further discussion about the details.

[1] https://issues.apache.org/jira/browse/FLINK-10653
[2]
https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing


<https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing>
Best,
Zhijiang

------------------------------------------------------------------
发件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com.INVALID>
发送时间:2018年9月11日(星期二) 15:21
收件人:dev <dev@flink.apache.org>
抄 送:dev <dev@flink.apache.org>
主 题:回复:[DISCUSS] Proposal of external shuffle service

Many thanks Till!


I would create a JIRA for this feature and design a document attched with it.
I will let you know after ready! :)

Best,
Zhijiang


------------------------------------------------------------------
发件人:Till Rohrmann <trohrm...@apache.org>
发送时间:2018年9月7日(星期五) 22:01
收件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>
抄 送:dev <dev@flink.apache.org>
主 题:Re: [DISCUSS] Proposal of external shuffle service

The rough plan sounds good Zhijiang. I think we should continue with what
you've proposed: Open a JIRA issue and creating a design document which
outlines the required changes a little bit more in detail. Once this is
done, we should link the design document in the JIRA issue and post it here
for further discussion.

Cheers,
Till

On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

Glad to receive your positive feedbacks Till!

Actually our motivation is to support batch job well as you mentioned.

For output level, flink already has the Subpartition abstraction(writer),
and currently there are PipelinedSubpartition(memory output) and
SpillableSubpartition(one-sp-one-file output) implementations. We can
extend this abstraction to realize other persistent outputs (e.g.
sort-merge-file).

For transport level(shuffle service), the current SubpartitionView
abstraction(reader) seems as the brige linked with the output level, then

the view can understand and read the different output formats. The current
NetworkEnvironment seems take the role of internal shuffle service in
TaskManager and the transport server is realized by netty inside. This

component can also be started in other external containers like NodeManager
of yarn to take the role of external shuffle service. Further we can

abstract to extend the shuffle service for transporting outputs by http or

rdma instead of current netty.  This abstraction should provide the way for
output registration in order to read the results correctly, similar with
current SubpartitionView.

The above is still a rough idea. Next I plan to create a feature jira to
cover the related changes if possible. It would be better if getting help
from related committers to review the detail designs together.

Best,
Zhijiang

------------------------------------------------------------------
发件人:Till Rohrmann <trohrm...@apache.org>
发送时间:2018年8月29日(星期三) 17:36
收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com>
主 题:Re: [DISCUSS] Proposal of external shuffle service

Thanks for starting this design discussion Zhijiang!

I really like the idea to introduce a ShuffleService abstraction which

allows to have different implementations depending on the actual use case.

Especially for batch jobs I can clearly see the benefits of persisting the
results somewhere else.

Do you already know which interfaces we need to extend and where to
introduce new abstractions?

Cheers,
Till

On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
<wangzhijiang...@aliyun.com.invalid> wrote:
Hi all!


The shuffle service is responsible for transporting upstream produced data
to the downstream side. In flink, the NettyServer is used for network

transport service and this component is started in the TaskManager process.
That means the TaskManager can support internal shuffle service which
exists some concerns:
1. If a task finishes, the ResultPartition of this task still retains
registered in TaskManager, because the output buffers have to be
transported by internal shuffle service in TaskManager. That means the
TaskManager can not be released by ResourceManager until ResultPartition
released. It may waste container resources and can not support well for
dynamic resource scenarios.
2. If we want to expand another shuffle service implementation, the
current mechanism is not easy to handle, because the output level (result
partition) and transport level (shuffle service) are not divided clearly
and loss of abstraction to be extended.

For above considerations, we propose the external shuffle service which
can be deployed on any other external contaienrs, e.g. NodeManager

container in yarn. Then the TaskManager can be released ASAP ifneeded when
all the internal tasks finished. The persistent output files of these
finished tasks can be served to transport by external shuffle service in
the same machine.

Further we can abstract both of the output level and transport level to

support different implementations. e.g. We realized merging the data of all

the subpartitions into limited persistent local files for disk improvements
in some scenarios instead of one-subpartiton-one-file.

I know it may be a big work for doing this, and I just point out some
ideas, and wish getting any feedbacks from you!

Best,
Zhijiang










Reply via email to