(don't know why your email ends with ".invalid") On Wed, Dec 19, 2018 at 9:13 AM Xiangrui Meng <men...@gmail.com> wrote:
> > > On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <il...@microsoft.com.invalid> > wrote: > > > > [Note: I sent this earlier but it looks like the email was blocked > because I had another email group on the CC line] > > > > Hi Spark Dev, > > > > I would like to use the new barrier execution mode introduced in spark > 2.4 with LightGBM in the spark package mmlspark but I ran into some issues > and I had a couple questions. > > > > Currently, the LightGBM distributed learner tries to figure out the > number of cores on the cluster and then does a coalesce and a > mapPartitions, and inside the mapPartitions we do a NetworkInit (where the > address:port of all workers needs to be passed in the constructor) and pass > the data in-memory to the native layer of the distributed lightgbm learner. > > > > > > > > With barrier execution mode, I think the code would become much more > robust. However, there are several issues that I am running into when > trying to move my code over to the new barrier execution mode scheduler: > > > > Does not support dynamic allocation – however, I think it would be > convenient if it restarted the job when the number of workers has decreased > and allowed the dev to decide whether to restart the job if the number of > workers increased > > How does mmlspark handle dynamic allocation? Do you have a watch thread on > the driver to restart the job if there are more workers? And when the > number of workers decrease, can training continue without driver involved? > > > Does not work with DataFrame or Dataset API, but I think it would be > much more convenient if it did > > DataFrame/Dataset do not have APIs to let users scan through the entire > partition. The closest is Pandas UDF, which scans data per batch. I'm > thinking about the following: > > If we change Pandas UDF to take an iterator of record batches (instead of > a single batch), and per contract we say this iterator will iterate through > the entire partition. So you only need to do NetworkInit once. > > > How does barrier execution mode deal with #partitions > #tasks? If the > number of partitions is larger than the number of “tasks” or workers, can > barrier execution mode automatically coalesce the dataset to have # > partitions == # tasks? > > It will hang there and print warning messages. We didn't assume user code > can correctly handle dynamic worker sizes. > > > It would be convenient to be able to get network information about all > other workers in the cluster that are in the same barrier execution, eg the > host address and some task # or identifier of all workers > > See getTaskInfos() at > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext > . > > We also provide a barrier() method there to assist simple coordination > among workers. > > > > > > > > > I would love to hear more about this new feature – also I had trouble > finding documentation (JIRA: > https://issues.apache.org/jira/browse/SPARK-24374, High level design: > https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1), > are there any good examples of spark packages that have moved to use the > new barrier execution mode in spark 2.4? > > Databricks (which I'm an employee of) implemented HorovodRunner > <https://databricks.com/blog/2018/11/19/introducing-horovodrunner-for-distributed-deep-learning-training.html>, > which fully utilizes barrier execution mode. There is also a > work-in-process open-source integration of Horovod/PySpark from Horovod > author. Doing distributed deep learning training was the main use case > considered in the design. > > Shall we have an offline meeting or open a JIRA to discuss more details > about integrating mmlspark w/ barrier execution mode? > > > > > > > > > Thank you, Ilya >