On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <il...@microsoft.com.invalid>
wrote:
>
> [Note: I sent this earlier but it looks like the email was blocked
because I had another email group on the CC line]
>
> Hi Spark Dev,
>
> I would like to use the new barrier execution mode introduced in spark
2.4 with LightGBM in the spark package mmlspark but I ran into some issues
and I had a couple questions.
>
> Currently, the LightGBM distributed learner tries to figure out the
number of cores on the cluster and then does a coalesce and a
mapPartitions, and inside the mapPartitions we do a NetworkInit (where the
address:port of all workers needs to be passed in the constructor) and pass
the data in-memory to the native layer of the distributed lightgbm learner.
>
>
>
> With barrier execution mode, I think the code would become much more
robust.  However, there are several issues that I am running into when
trying to move my code over to the new barrier execution mode scheduler:
>
> Does not support dynamic allocation – however, I think it would be
convenient if it restarted the job when the number of workers has decreased
and allowed the dev to decide whether to restart the job if the number of
workers increased

How does mmlspark handle dynamic allocation? Do you have a watch thread on
the driver to restart the job if there are more workers? And when the
number of workers decrease, can training continue without driver involved?

> Does not work with DataFrame or Dataset API, but I think it would be much
more convenient if it did

DataFrame/Dataset do not have APIs to let users scan through the entire
partition. The closest is Pandas UDF, which scans data per batch. I'm
thinking about the following:

If we change Pandas UDF to take an iterator of record batches (instead of a
single batch), and per contract we say this iterator will iterate through
the entire partition. So you only need to do NetworkInit once.

> How does barrier execution mode deal with #partitions > #tasks?  If the
number of partitions is larger than the number of “tasks” or workers, can
barrier execution mode automatically coalesce the dataset to have #
partitions == # tasks?

It will hang there and print warning messages. We didn't assume user code
can correctly handle dynamic worker sizes.

> It would be convenient to be able to get network information about all
other workers in the cluster that are in the same barrier execution, eg the
host address and some task # or identifier of all workers

See getTaskInfos() at
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext
.

We also provide a barrier() method there to assist simple coordination
among workers.

>
>
>
> I would love to hear more about this new feature – also I had trouble
finding documentation (JIRA:
https://issues.apache.org/jira/browse/SPARK-24374, High level design:
https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1),
are there any good examples of spark packages that have moved to use the
new barrier execution mode in spark 2.4?

Databricks (which I'm an employee of) implemented HorovodRunner
<https://databricks.com/blog/2018/11/19/introducing-horovodrunner-for-distributed-deep-learning-training.html>,
which fully utilizes barrier execution mode. There is also a
work-in-process open-source integration of Horovod/PySpark from Horovod
author. Doing distributed deep learning training was the main use case
considered in the design.

Shall we have an offline meeting or open a JIRA to discuss more details
about integrating mmlspark w/ barrier execution mode?

>
>
>
> Thank you, Ilya

Reply via email to