[ https://issues.apache.org/jira/browse/SPARK-22658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy Feng updated SPARK-22658: ------------------------------ Description: SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark Authors: Lee Yang (Yahoo/Oath), Andrew Feng (Yahoo/Oath) Background and Motivation Deep learning has evolved significantly in recent years, and is often considered a desired mechanism to gain insight from massive amounts of data. TensorFlow is currently the most popular deep learning library, and has been adopted by many organizations to solve a variety of use cases. After TensorFlow’s initial publication, Google released an enhanced TensorFlow with distributed deep learning capabilities in April 2016. In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed TensorFlow training and inference on Apache Spark clusters. TFoS is designed to: Easily migrate all existing TensorFlow programs with minimum code change; Support all TensorFlow functionalities: synchronous/asynchronous training, model/data parallelism, inference and TensorBoard; Easily integrate with your existing data processing pipelines (ex. Spark SQL) and machine learning algorithms (ex. MLlib); Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and Infiniband. At Yahoo/Oath, TFoS has become the most popular deep learning framework for many types of mission critical use cases, many which use 10’s servers of CPU or GPU. Outside Yahoo, TFoS has generated interest from LinkedIn, Paytm Labs, Hops Hadoop, Cloudera, MapR and Google. TFoS has become a popular choice for distributed TensorFlow applications on Spark clusters. We propose to merge TFoS into Apache Spark as a scalable deep learning library to: Make deep learning easy for Apache Spark community Familiar pipeline API for training and inference Enable TensorFlow training/inference on existing Spark clusters Further simplify data scientist experience Ensure compatibility b/w Apache Spark and TFoS Reduce steps for installation Help Apache Spark evolutions on deep learning Establish a design pattern for additional frameworks (ex. Caffe, BigDL, CNTK) Structured streaming for DL training/inference Target Personas Data scientists Data engineers Library developers Goals Spark ML style API for distributed TensorFlow training and inference Support all types of TensorFlow applications (ex. asynchronous learning, model parallelism) and functionalities (ex. TensorBoard) Support all TensorFlow trained models to be used for scalable inference and transfer learning with ZERO custom code Support all Spark schedulers, including standalone, YARN, and Mesos Support TensorFlow 1.0 and later Initially Python API only Scala and Java API could be added for inference later Non-Goals Deep learning frameworks beyond TensorFlow Non-distributed TensorFlow applications on Apache Spark (ex. single node, or parallel execution for hyper-parameter search) Proposed API Changes Pipeline API: TFEstimator model = TFEstimator(train_fn, tf_args) .setInputMapping({“image”: “placeholder_X”, “label”: “placeholder_Y”}) .setModelDir(“my_model_checkpoints”) .setSteps(10000) .setEpochs(10) .fit(training_data_frame) TFEstimator is a Spark ML estimator which launches a TensorFlowOnSpark cluster for distributed training. Its constructor TFEstimator(train_fn, tf_args, export_fn) accepts the following arguments: train_fn ... TensorFlow "main" function for training. tf_args ... Dictionary of arguments specific to TensorFlow "main" function. export_fn ... TensorFlow function for exporting a saved_model. TFEstimator has a collection of parameters including InputMapping … Mapping of input DataFrame column to input tensor ModelDir … Path to save/load model checkpoints ExportDir … Directory to export saved_model BatchSize … Number of records per batch (default: 100) ClusterSize … Number of nodes in the cluster (default: 1) NumPS … Number of PS nodes in cluster (default: 0) Readers … Number of reader/enqueue threads (default: 1) Tensorboard … Boolean flag indicating tensorboard launch or not (default: false) Steps … Maximum number of steps to train (default: 1000) Epochs … Number of epochs to train (default: 1) Protocol … Network protocol for Tensorflow (grpc|rdma) (default: grpc) InputMode … Input data feeding mode (TENSORFLOW, SPARK) (default: SPARK) TFEstimator.fit(dataset) trains a TensorFlow model based on the given training dataset. The training dataset is a Spark DataFrame with columns that will be mapped to TensorFlow tensors as specified by InputMapping parameter. TFEstimator.fit() returns a TFModel instance representing the trained model, backed on disk by a TensorFlow checkpoint or saved_model. TensorFlow Training Application: train_fun(tf_args, TFContext) The 1st argument for TFEstimator, train_fun, allows custom TensorFlow applications to be easily plugged into the Spark environment. A custom TensorFlow application will be present as a Python function, train_fun(tf_args, ctx), with 2 arguments: tf_args ... Dictionary of arguments specific to TensorFlow "main" function. ctx … a TFContext object, which represents the context for TensorFlow application execution A TFContext object has the following methods and attributes including next_batch(size) ... get a batch from the input RDD tf_path(path) … get a TF compatible path cluster … The associated TensorFlow cluster job_name … Name of this TensorFlow job: “ps” or “worker” task_index … Index of this node within a particular job_name server … The associated parameter server worker_num … The unique ID assigned to this TensorFlow worker Pipeline API: TFModel model = TFModel() .setModelDir(“my_model_checkpoints”) .setInputMapping({“image”: placeholder_X}) .setOutputMapping({“prediction”: “col_out”}) preds = model.transform(df) TFModel is a Spark ML Model object. It could be constructed by a simple constructor TFModel(). TFModel has a collection of parameters including InputMapping … Mapping of input DataFrame column to input tensor OutputMapping … Mapping of output tensor to output DataFrame column ModelDir … Path to load model checkpoints ExportDir … Directory to export saved_model SignatureDefKey … Identifier for a specific saved_model signature TagSet … Comma-delimited list of tags identifying a saved_model metagraph BatchSize … Number of records per batch (deafult: 100) TFModel.transform(dataset) applies the associated TensorFlow model on the input DataFrame for TensorFlow inferencing. We support distributed TensorFlow inference in 3 styles: Using TensorFlow checkpoint only … Model is loaded from checkpoint as specified ModelDir parameter. The training dataset is a Spark DataFrame with columns that will be mapped to TensorFlow tensors as specified by InputMapping parameter. TFModel.transform() returns an output DataFrame contains output tensors specified by OutputMapping parameter. Using TensorFlow saved model’s predefined signatures … Model is loaded from an exported saved model as specified by ExportDir parameter. The training dataset is a Spark DataFrame with columns that will be mapped to TensorFlow tensors as specified by InputMapping and SignatureDefKey parameter. TFModel.transform() returns an output DataFrame contains output tensors specified by OutputMapping and SignatureDefKey parameter. In this case, InputMapping and OutputMapping are defined based on TensorFlow aliases specified in a signature. Using TensorFlow saved model, but ignoring the predefined signatures … Model is loaded from an exported saved model as specified by ExportDir parameter. The training dataset is a Spark DataFrame with columns that will be mapped to TensorFlow tensors as specified by InputMapping parameter. TFModel.transform() returns an output DataFrame contains output tensors specified by OutputMapping parameter. Sample Applications Distributed Training Training logic Distributed Inference w/ predefined signatures Distributed Inference w/o predefined signatures MNIST w/ SPARK data feeding MNIST TFEstimator map_func(tf_args, ctx) MNIST TFModel MNIST TFModel MNIST w/ TENSORFLOW data feeding MNIST TFEstimator map_func(tf_args, ctx) MNIST TFModel MNIST TFModel INCEPTION w/ TENSORFLOW data feeding INCEPTION TFEstimator main_func(tf_args, ctx) INCEPTION TFModel Design Sketch The above figure illustrates the basic architectural design of TensorFlowOnSpark. We launch TensorFlow training and inferencing on Spark executors. These Spark executors dynamically form a TensorFlow cluster with server-to-server connectivity. Some of these executors are TensorFlow parameter servers, and others are TensorFlow workers. The server-to-server connectivities enable TFoS to scale easily by adding machines. As illustrated in the above figure, TensorFlowOnSpark doesn’t involve Spark drivers in tensor communication, and thus achieves similar scalability as stand-alone TensorFlow clusters. Optional Rejected Designs An alternative design is to have Spark executors only communicate with Spark drivers, not to each other. We rejected that popular architecture for Spark applications for the following reason: It would not allow us to support distributed asynchronous learning. It would not enable model parallelism for large models. Spark driver will become a scalability bottleneck. Acknowledgements We’d like to thank Tim Hunter (Databricks), Sue Ann Hong (Databricks), and Matei Zaharia (Databricks) for their input on design for TFEstimator and TFModel. was: In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed TensorFlow training and inference on Apache Spark clusters. TFoS is designed to: * Easily migrate all existing TensorFlow programs with minimum code change; * Support all TensorFlow functionalities: synchronous/asynchronous training, model/data parallelism, inference and TensorBoard; * Easily integrate with your existing data processing pipelines (ex. Spark SQL) and machine learning algorithms (ex. MLlib); * Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and Infiniband. We propose to merge TFoS into Apache Spark as a scalable deep learning library to: * Make deep learning easy for Apache Spark community: Familiar pipeline API for training and inference; Enable TensorFlow training/inference on existing Spark clusters. * Further simplify data scientist experience: Ensure compatibility b/w Apache Spark and TFoS; Reduce steps for installation. * Help Apache Spark evolution on deep learning: Establish a design pattern for additional frameworks (ex. Caffe, CNTK); Structured streaming for DL training/inference. > SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark > ------------------------------------------------------------------------ > > Key: SPARK-22658 > URL: https://issues.apache.org/jira/browse/SPARK-22658 > Project: Spark > Issue Type: New Feature > Components: ML > Affects Versions: 2.2.0 > Reporter: Andy Feng > Original Estimate: 336h > Remaining Estimate: 336h > > SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark > Authors: Lee Yang (Yahoo/Oath), Andrew Feng (Yahoo/Oath) > Background and Motivation > Deep learning has evolved significantly in recent years, and is often > considered a desired mechanism to gain insight from massive amounts of data. > TensorFlow is currently the most popular deep learning library, and has been > adopted by many organizations to solve a variety of use cases. After > TensorFlow’s initial publication, Google released an enhanced TensorFlow with > distributed deep learning capabilities in April 2016. > In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed > TensorFlow training and inference on Apache Spark clusters. TFoS is designed > to: > Easily migrate all existing TensorFlow programs with minimum code change; > Support all TensorFlow functionalities: synchronous/asynchronous training, > model/data parallelism, inference and TensorBoard; > Easily integrate with your existing data processing pipelines (ex. Spark SQL) > and machine learning algorithms (ex. MLlib); > Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and Infiniband. > At Yahoo/Oath, TFoS has become the most popular deep learning framework for > many types of mission critical use cases, many which use 10’s servers of CPU > or GPU. Outside Yahoo, TFoS has generated interest from LinkedIn, Paytm Labs, > Hops Hadoop, Cloudera, MapR and Google. TFoS has become a popular choice for > distributed TensorFlow applications on Spark clusters. > We propose to merge TFoS into Apache Spark as a scalable deep learning > library to: > Make deep learning easy for Apache Spark community > Familiar pipeline API for training and inference > Enable TensorFlow training/inference on existing Spark clusters > Further simplify data scientist experience > Ensure compatibility b/w Apache Spark and TFoS > Reduce steps for installation > Help Apache Spark evolutions on deep learning > Establish a design pattern for additional frameworks (ex. Caffe, BigDL, CNTK) > Structured streaming for DL training/inference > Target Personas > Data scientists > Data engineers > Library developers > Goals > Spark ML style API for distributed TensorFlow training and inference > Support all types of TensorFlow applications (ex. asynchronous learning, > model parallelism) and functionalities (ex. TensorBoard) > Support all TensorFlow trained models to be used for scalable inference and > transfer learning with ZERO custom code > Support all Spark schedulers, including standalone, YARN, and Mesos > Support TensorFlow 1.0 and later > Initially Python API only > Scala and Java API could be added for inference later > Non-Goals > Deep learning frameworks beyond TensorFlow > Non-distributed TensorFlow applications on Apache Spark (ex. single node, or > parallel execution for hyper-parameter search) > Proposed API Changes > Pipeline API: TFEstimator > model = TFEstimator(train_fn, tf_args) > .setInputMapping({“image”: “placeholder_X”, > “label”: “placeholder_Y”}) > .setModelDir(“my_model_checkpoints”) > .setSteps(10000) > .setEpochs(10) > .fit(training_data_frame) > TFEstimator is a Spark ML estimator which launches a TensorFlowOnSpark > cluster for distributed training. Its constructor TFEstimator(train_fn, > tf_args, export_fn) accepts the following arguments: > train_fn ... TensorFlow "main" function for training. > tf_args ... Dictionary of arguments specific to TensorFlow "main" function. > export_fn ... TensorFlow function for exporting a saved_model. > TFEstimator has a collection of parameters including > InputMapping … Mapping of input DataFrame column to input tensor > ModelDir … Path to save/load model checkpoints > ExportDir … Directory to export saved_model > BatchSize … Number of records per batch (default: 100) > ClusterSize … Number of nodes in the cluster (default: 1) > NumPS … Number of PS nodes in cluster (default: 0) > Readers … Number of reader/enqueue threads (default: 1) > Tensorboard … Boolean flag indicating tensorboard launch or not (default: > false) > Steps … Maximum number of steps to train (default: 1000) > Epochs … Number of epochs to train (default: 1) > Protocol … Network protocol for Tensorflow (grpc|rdma) (default: grpc) > InputMode … Input data feeding mode (TENSORFLOW, SPARK) (default: SPARK) > TFEstimator.fit(dataset) trains a TensorFlow model based on the given > training dataset. The training dataset is a Spark DataFrame with columns that > will be mapped to TensorFlow tensors as specified by InputMapping parameter. > TFEstimator.fit() returns a TFModel instance representing the trained model, > backed on disk by a TensorFlow checkpoint or saved_model. > TensorFlow Training Application: train_fun(tf_args, TFContext) > The 1st argument for TFEstimator, train_fun, allows custom TensorFlow > applications to be easily plugged into the Spark environment. A custom > TensorFlow application will be present as a Python function, > train_fun(tf_args, ctx), with 2 arguments: > tf_args ... Dictionary of arguments specific to TensorFlow "main" function. > ctx … a TFContext object, which represents the context for TensorFlow > application execution > > A TFContext object has the following methods and attributes including > next_batch(size) ... get a batch from the input RDD > tf_path(path) … get a TF compatible path > cluster … The associated TensorFlow cluster > job_name … Name of this TensorFlow job: “ps” or “worker” > task_index … Index of this node within a particular job_name > server … The associated parameter server > worker_num … The unique ID assigned to this TensorFlow worker > > Pipeline API: TFModel > model = TFModel() > .setModelDir(“my_model_checkpoints”) > .setInputMapping({“image”: placeholder_X}) > .setOutputMapping({“prediction”: “col_out”}) > preds = model.transform(df) > TFModel is a Spark ML Model object. It could be constructed by a simple > constructor TFModel(). TFModel has a collection of parameters including > InputMapping … Mapping of input DataFrame column to input tensor > OutputMapping … Mapping of output tensor to output DataFrame column > ModelDir … Path to load model checkpoints > ExportDir … Directory to export saved_model > SignatureDefKey … Identifier for a specific saved_model signature > TagSet … Comma-delimited list of tags identifying a saved_model metagraph > BatchSize … Number of records per batch (deafult: 100) > TFModel.transform(dataset) applies the associated TensorFlow model on the > input DataFrame for TensorFlow inferencing. We support distributed TensorFlow > inference in 3 styles: > Using TensorFlow checkpoint only … Model is loaded from checkpoint as > specified ModelDir parameter. The training dataset is a Spark DataFrame with > columns that will be mapped to TensorFlow tensors as specified by > InputMapping parameter. TFModel.transform() returns an output DataFrame > contains output tensors specified by OutputMapping parameter. > Using TensorFlow saved model’s predefined signatures … Model is loaded from > an exported saved model as specified by ExportDir parameter. The training > dataset is a Spark DataFrame with columns that will be mapped to TensorFlow > tensors as specified by InputMapping and SignatureDefKey parameter. > TFModel.transform() returns an output DataFrame contains output tensors > specified by OutputMapping and SignatureDefKey parameter. In this case, > InputMapping and OutputMapping are defined based on TensorFlow aliases > specified in a signature. > Using TensorFlow saved model, but ignoring the predefined signatures … Model > is loaded from an exported saved model as specified by ExportDir parameter. > The training dataset is a Spark DataFrame with columns that will be mapped to > TensorFlow tensors as specified by InputMapping parameter. > TFModel.transform() returns an output DataFrame contains output tensors > specified by OutputMapping parameter. > Sample Applications > Distributed Training > Training logic > Distributed Inference w/ predefined signatures > Distributed Inference w/o predefined signatures > MNIST w/ SPARK data feeding > MNIST TFEstimator > map_func(tf_args, ctx) > MNIST TFModel > MNIST TFModel > MNIST w/ TENSORFLOW data feeding > MNIST TFEstimator > map_func(tf_args, ctx) > MNIST TFModel > MNIST TFModel > INCEPTION w/ TENSORFLOW data feeding > INCEPTION TFEstimator > main_func(tf_args, ctx) > INCEPTION TFModel > Design Sketch > The above figure illustrates the basic architectural design of > TensorFlowOnSpark. We launch TensorFlow training and inferencing on Spark > executors. These Spark executors dynamically form a TensorFlow cluster with > server-to-server connectivity. Some of these executors are TensorFlow > parameter servers, and others are TensorFlow workers. The server-to-server > connectivities enable TFoS to scale easily by adding machines. As illustrated > in the above figure, TensorFlowOnSpark doesn’t involve Spark drivers in > tensor communication, and thus achieves similar scalability as stand-alone > TensorFlow clusters. > Optional Rejected Designs > An alternative design is to have Spark executors only communicate with Spark > drivers, not to each other. We rejected that popular architecture for Spark > applications for the following reason: > It would not allow us to support distributed asynchronous learning. > It would not enable model parallelism for large models. > Spark driver will become a scalability bottleneck. > Acknowledgements > We’d like to thank Tim Hunter (Databricks), Sue Ann Hong (Databricks), and > Matei Zaharia (Databricks) for their input on design for TFEstimator and > TFModel. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org