(don't know why your email ends with ".invalid")

On Wed, Dec 19, 2018 at 9:13 AM Xiangrui Meng <men...@gmail.com> wrote:

>
>
> On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <il...@microsoft.com.invalid>
> wrote:
> >
> > [Note: I sent this earlier but it looks like the email was blocked
> because I had another email group on the CC line]
> >
> > Hi Spark Dev,
> >
> > I would like to use the new barrier execution mode introduced in spark
> 2.4 with LightGBM in the spark package mmlspark but I ran into some issues
> and I had a couple questions.
> >
> > Currently, the LightGBM distributed learner tries to figure out the
> number of cores on the cluster and then does a coalesce and a
> mapPartitions, and inside the mapPartitions we do a NetworkInit (where the
> address:port of all workers needs to be passed in the constructor) and pass
> the data in-memory to the native layer of the distributed lightgbm learner.
> >
> >
> >
> > With barrier execution mode, I think the code would become much more
> robust.  However, there are several issues that I am running into when
> trying to move my code over to the new barrier execution mode scheduler:
> >
> > Does not support dynamic allocation – however, I think it would be
> convenient if it restarted the job when the number of workers has decreased
> and allowed the dev to decide whether to restart the job if the number of
> workers increased
>
> How does mmlspark handle dynamic allocation? Do you have a watch thread on
> the driver to restart the job if there are more workers? And when the
> number of workers decrease, can training continue without driver involved?
>
> > Does not work with DataFrame or Dataset API, but I think it would be
> much more convenient if it did
>
> DataFrame/Dataset do not have APIs to let users scan through the entire
> partition. The closest is Pandas UDF, which scans data per batch. I'm
> thinking about the following:
>
> If we change Pandas UDF to take an iterator of record batches (instead of
> a single batch), and per contract we say this iterator will iterate through
> the entire partition. So you only need to do NetworkInit once.
>
> > How does barrier execution mode deal with #partitions > #tasks?  If the
> number of partitions is larger than the number of “tasks” or workers, can
> barrier execution mode automatically coalesce the dataset to have #
> partitions == # tasks?
>
> It will hang there and print warning messages. We didn't assume user code
> can correctly handle dynamic worker sizes.
>
> > It would be convenient to be able to get network information about all
> other workers in the cluster that are in the same barrier execution, eg the
> host address and some task # or identifier of all workers
>
> See getTaskInfos() at
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext
> .
>
> We also provide a barrier() method there to assist simple coordination
> among workers.
>
> >
> >
> >
> > I would love to hear more about this new feature – also I had trouble
> finding documentation (JIRA:
> https://issues.apache.org/jira/browse/SPARK-24374, High level design:
> https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1),
> are there any good examples of spark packages that have moved to use the
> new barrier execution mode in spark 2.4?
>
> Databricks (which I'm an employee of) implemented HorovodRunner
> <https://databricks.com/blog/2018/11/19/introducing-horovodrunner-for-distributed-deep-learning-training.html>,
> which fully utilizes barrier execution mode. There is also a
> work-in-process open-source integration of Horovod/PySpark from Horovod
> author. Doing distributed deep learning training was the main use case
> considered in the design.
>
> Shall we have an offline meeting or open a JIRA to discuss more details
> about integrating mmlspark w/ barrier execution mode?
>
> >
> >
> >
> > Thank you, Ilya
>

Reply via email to