[ 
https://issues.apache.org/jira/browse/SPARK-22658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Feng updated SPARK-22658:
------------------------------
    Description: 
SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark

Authors: Lee Yang (Yahoo/Oath), Andrew Feng (Yahoo/Oath)
Background and Motivation
Deep learning has evolved significantly in recent years, and is often 
considered a desired mechanism to gain insight from massive amounts of data. 
TensorFlow is currently the most popular deep learning library, and has been 
adopted by many organizations to solve a variety of use cases. After 
TensorFlow’s initial publication, Google released an enhanced TensorFlow with 
distributed deep learning capabilities in April 2016. 

In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed 
TensorFlow training and inference on Apache Spark clusters. TFoS is designed to:
Easily migrate all existing TensorFlow programs with minimum code change;
Support all TensorFlow functionalities: synchronous/asynchronous training, 
model/data parallelism, inference and TensorBoard;
Easily integrate with your existing data processing pipelines (ex. Spark SQL) 
and machine learning algorithms (ex. MLlib);
Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and Infiniband.

At Yahoo/Oath, TFoS has become the most popular deep learning framework for 
many types of mission critical use cases, many which use 10’s servers of CPU or 
GPU. Outside Yahoo, TFoS has generated interest from LinkedIn, Paytm Labs, Hops 
Hadoop, Cloudera, MapR and Google. TFoS has become a popular choice for 
distributed TensorFlow applications on Spark clusters. 

We propose to merge TFoS into Apache Spark as a scalable deep learning library 
to:
Make deep learning easy for Apache Spark community
Familiar pipeline API for training and inference
Enable TensorFlow training/inference on existing Spark clusters
Further simplify data scientist experience
Ensure compatibility b/w Apache Spark and TFoS
Reduce steps for installation
Help Apache Spark evolutions on deep learning
Establish a design pattern for additional frameworks (ex. Caffe, BigDL, CNTK) 
Structured streaming for DL training/inference
Target Personas
Data scientists
Data engineers
Library developers
Goals
Spark ML style API for distributed TensorFlow training and inference
Support all types of TensorFlow applications (ex. asynchronous learning, model 
parallelism) and functionalities (ex. TensorBoard)
Support all TensorFlow trained models to be used for scalable inference and 
transfer learning with ZERO custom code
Support all Spark schedulers, including standalone, YARN, and Mesos
Support TensorFlow 1.0 and later
Initially Python API only
Scala and Java API could be added for inference later 
Non-Goals
Deep learning frameworks beyond TensorFlow
Non-distributed TensorFlow applications on Apache Spark (ex. single node, or 
parallel execution for hyper-parameter search)
Proposed API Changes
Pipeline API: TFEstimator
model = TFEstimator(train_fn, tf_args)
  .setInputMapping({“image”: “placeholder_X”,  
                    “label”: “placeholder_Y”})
  .setModelDir(“my_model_checkpoints”)
  .setSteps(10000)
  .setEpochs(10)
  .fit(training_data_frame)
TFEstimator is a Spark ML estimator which launches a TensorFlowOnSpark cluster 
for distributed training. Its constructor TFEstimator(train_fn, tf_args, 
export_fn) accepts the following arguments:
train_fn ... TensorFlow "main" function for training.
tf_args ... Dictionary of arguments specific to TensorFlow "main" function.
export_fn ... TensorFlow function for exporting a saved_model.

TFEstimator has a collection of parameters including
InputMapping … Mapping of input DataFrame column to input tensor
ModelDir … Path to save/load model checkpoints
ExportDir … Directory to export saved_model
BatchSize … Number of records per batch (default: 100)
ClusterSize … Number of nodes in the cluster (default: 1)
NumPS … Number of PS nodes in cluster (default: 0)
Readers … Number of reader/enqueue threads (default: 1)
Tensorboard … Boolean flag indicating tensorboard launch or not (default: false)
Steps … Maximum number of steps to train (default: 1000)
Epochs … Number of epochs to train (default: 1)
Protocol … Network protocol for Tensorflow (grpc|rdma) (default: grpc)
InputMode … Input data feeding mode (TENSORFLOW, SPARK) (default: SPARK)

TFEstimator.fit(dataset) trains a TensorFlow model based on the given training 
dataset. The training dataset is a Spark DataFrame with columns that will be 
mapped to TensorFlow tensors as specified by InputMapping parameter. 
TFEstimator.fit() returns a TFModel instance representing the trained model, 
backed on disk by a TensorFlow checkpoint or saved_model.
TensorFlow Training Application: train_fun(tf_args, TFContext)
The 1st argument for TFEstimator, train_fun, allows custom TensorFlow 
applications to be easily plugged into the Spark environment. A custom 
TensorFlow application will be present as a Python function, train_fun(tf_args, 
ctx), with 2 arguments:
tf_args ... Dictionary of arguments specific to TensorFlow "main" function.
ctx … a TFContext object, which represents the context for TensorFlow 
application execution
 
A TFContext object has the following methods and attributes including
next_batch(size) ... get a batch from the input RDD
tf_path(path) … get a TF compatible path
cluster …  The associated TensorFlow cluster
job_name … Name of this TensorFlow job: “ps” or “worker” 
task_index …  Index of this node within a particular job_name
server … The associated parameter server
worker_num … The unique ID assigned to this TensorFlow worker
     

Pipeline API: TFModel
model = TFModel()
  .setModelDir(“my_model_checkpoints”)
  .setInputMapping({“image”: placeholder_X})
  .setOutputMapping({“prediction”: “col_out”})
preds = model.transform(df)

TFModel is a Spark ML Model  object. It could be constructed by a simple 
constructor TFModel(). TFModel has a collection of parameters including
InputMapping … Mapping of input DataFrame column to input tensor
OutputMapping … Mapping of output tensor to output DataFrame column
ModelDir … Path to load model checkpoints
ExportDir … Directory to export saved_model
SignatureDefKey … Identifier for a specific saved_model signature
TagSet … Comma-delimited list of tags identifying a saved_model metagraph
BatchSize … Number of records per batch (deafult: 100)

TFModel.transform(dataset) applies the associated TensorFlow model on the input 
DataFrame for TensorFlow inferencing. We support distributed TensorFlow 
inference in 3 styles:
Using TensorFlow checkpoint only … Model is loaded from checkpoint as specified 
ModelDir parameter. The training dataset is a Spark DataFrame with columns that 
will be mapped to TensorFlow tensors as specified by InputMapping parameter. 
TFModel.transform() returns an output DataFrame contains output tensors 
specified by OutputMapping parameter. 
Using TensorFlow saved model’s predefined signatures … Model is loaded from an 
exported saved model as specified by ExportDir parameter. The training dataset 
is a Spark DataFrame with columns that will be mapped to TensorFlow tensors as 
specified by InputMapping and SignatureDefKey parameter. TFModel.transform() 
returns an output DataFrame contains output tensors specified by OutputMapping  
and SignatureDefKey parameter. In this case, InputMapping and OutputMapping are 
defined based on TensorFlow aliases specified in a signature.
Using TensorFlow saved model, but ignoring the predefined signatures … Model is 
loaded from an exported saved model as specified by ExportDir parameter. The 
training dataset is a Spark DataFrame with columns that will be mapped to 
TensorFlow tensors as specified by InputMapping parameter. TFModel.transform() 
returns an output DataFrame contains output tensors specified by OutputMapping 
parameter. 
Sample Applications



Distributed Training
Training logic
Distributed Inference w/ predefined signatures
Distributed Inference w/o predefined signatures
MNIST w/ SPARK data feeding 
MNIST TFEstimator
map_func(tf_args, ctx)
MNIST TFModel
MNIST TFModel
MNIST w/ TENSORFLOW data feeding 
MNIST TFEstimator
map_func(tf_args, ctx)
MNIST TFModel
MNIST TFModel
INCEPTION w/ TENSORFLOW data feeding
INCEPTION TFEstimator
main_func(tf_args, ctx)
INCEPTION TFModel



Design Sketch

The above figure illustrates the basic architectural design of 
TensorFlowOnSpark. We launch TensorFlow training and inferencing on Spark 
executors. These Spark executors dynamically form a TensorFlow cluster with 
server-to-server connectivity.  Some of these executors are TensorFlow 
parameter servers, and others are TensorFlow workers. The server-to-server 
connectivities enable TFoS to scale easily by adding machines. As illustrated 
in the above figure, TensorFlowOnSpark doesn’t involve Spark drivers in tensor 
communication, and thus achieves similar scalability as stand-alone TensorFlow 
clusters. 
Optional Rejected Designs
An alternative design is to have Spark executors only communicate with Spark 
drivers, not to each other.  We rejected that popular architecture for Spark 
applications for the following reason:
It would not allow us to support distributed asynchronous learning. 
It would not enable model parallelism for large models. 
Spark driver will become a scalability bottleneck.
Acknowledgements
We’d like to thank Tim Hunter (Databricks), Sue Ann Hong (Databricks), and 
Matei Zaharia (Databricks) for their input on design for TFEstimator and 
TFModel.


  was:
In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed 
TensorFlow training and inference on Apache Spark clusters. TFoS is designed to:
   * Easily migrate all existing TensorFlow programs with minimum code change;
   * Support all TensorFlow functionalities: synchronous/asynchronous training, 
model/data parallelism, inference and TensorBoard;
   * Easily integrate with your existing data processing pipelines (ex. Spark 
SQL) and machine learning algorithms (ex. MLlib);
   * Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and 
Infiniband.

We propose to merge TFoS into Apache Spark as a scalable deep learning library 
to:
* Make deep learning easy for Apache Spark community:  Familiar pipeline API 
for training and inference; Enable TensorFlow training/inference on existing 
Spark clusters.
* Further simplify data scientist experience: Ensure compatibility b/w Apache 
Spark and TFoS; Reduce steps for installation.
* Help Apache Spark evolution on deep learning: Establish a design pattern for 
additional frameworks (ex. Caffe, CNTK); Structured streaming for DL 
training/inference.



> SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark
> ------------------------------------------------------------------------
>
>                 Key: SPARK-22658
>                 URL: https://issues.apache.org/jira/browse/SPARK-22658
>             Project: Spark
>          Issue Type: New Feature
>          Components: ML
>    Affects Versions: 2.2.0
>            Reporter: Andy Feng
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> SPIP: TeansorFlowOnSpark as a Scalable Deep Learning Lib of Apache Spark
> Authors: Lee Yang (Yahoo/Oath), Andrew Feng (Yahoo/Oath)
> Background and Motivation
> Deep learning has evolved significantly in recent years, and is often 
> considered a desired mechanism to gain insight from massive amounts of data. 
> TensorFlow is currently the most popular deep learning library, and has been 
> adopted by many organizations to solve a variety of use cases. After 
> TensorFlow’s initial publication, Google released an enhanced TensorFlow with 
> distributed deep learning capabilities in April 2016. 
> In Feburary 2017, TensorFlowOnSpark (TFoS) was released for distributed 
> TensorFlow training and inference on Apache Spark clusters. TFoS is designed 
> to:
> Easily migrate all existing TensorFlow programs with minimum code change;
> Support all TensorFlow functionalities: synchronous/asynchronous training, 
> model/data parallelism, inference and TensorBoard;
> Easily integrate with your existing data processing pipelines (ex. Spark SQL) 
> and machine learning algorithms (ex. MLlib);
> Be easily deployed on cloud or on-premise: CPU & GPU, Ethernet and Infiniband.
> At Yahoo/Oath, TFoS has become the most popular deep learning framework for 
> many types of mission critical use cases, many which use 10’s servers of CPU 
> or GPU. Outside Yahoo, TFoS has generated interest from LinkedIn, Paytm Labs, 
> Hops Hadoop, Cloudera, MapR and Google. TFoS has become a popular choice for 
> distributed TensorFlow applications on Spark clusters. 
> We propose to merge TFoS into Apache Spark as a scalable deep learning 
> library to:
> Make deep learning easy for Apache Spark community
> Familiar pipeline API for training and inference
> Enable TensorFlow training/inference on existing Spark clusters
> Further simplify data scientist experience
> Ensure compatibility b/w Apache Spark and TFoS
> Reduce steps for installation
> Help Apache Spark evolutions on deep learning
> Establish a design pattern for additional frameworks (ex. Caffe, BigDL, CNTK) 
> Structured streaming for DL training/inference
> Target Personas
> Data scientists
> Data engineers
> Library developers
> Goals
> Spark ML style API for distributed TensorFlow training and inference
> Support all types of TensorFlow applications (ex. asynchronous learning, 
> model parallelism) and functionalities (ex. TensorBoard)
> Support all TensorFlow trained models to be used for scalable inference and 
> transfer learning with ZERO custom code
> Support all Spark schedulers, including standalone, YARN, and Mesos
> Support TensorFlow 1.0 and later
> Initially Python API only
> Scala and Java API could be added for inference later 
> Non-Goals
> Deep learning frameworks beyond TensorFlow
> Non-distributed TensorFlow applications on Apache Spark (ex. single node, or 
> parallel execution for hyper-parameter search)
> Proposed API Changes
> Pipeline API: TFEstimator
> model = TFEstimator(train_fn, tf_args)
>   .setInputMapping({“image”: “placeholder_X”,  
>                     “label”: “placeholder_Y”})
>   .setModelDir(“my_model_checkpoints”)
>   .setSteps(10000)
>   .setEpochs(10)
>   .fit(training_data_frame)
> TFEstimator is a Spark ML estimator which launches a TensorFlowOnSpark 
> cluster for distributed training. Its constructor TFEstimator(train_fn, 
> tf_args, export_fn) accepts the following arguments:
> train_fn ... TensorFlow "main" function for training.
> tf_args ... Dictionary of arguments specific to TensorFlow "main" function.
> export_fn ... TensorFlow function for exporting a saved_model.
> TFEstimator has a collection of parameters including
> InputMapping … Mapping of input DataFrame column to input tensor
> ModelDir … Path to save/load model checkpoints
> ExportDir … Directory to export saved_model
> BatchSize … Number of records per batch (default: 100)
> ClusterSize … Number of nodes in the cluster (default: 1)
> NumPS … Number of PS nodes in cluster (default: 0)
> Readers … Number of reader/enqueue threads (default: 1)
> Tensorboard … Boolean flag indicating tensorboard launch or not (default: 
> false)
> Steps … Maximum number of steps to train (default: 1000)
> Epochs … Number of epochs to train (default: 1)
> Protocol … Network protocol for Tensorflow (grpc|rdma) (default: grpc)
> InputMode … Input data feeding mode (TENSORFLOW, SPARK) (default: SPARK)
> TFEstimator.fit(dataset) trains a TensorFlow model based on the given 
> training dataset. The training dataset is a Spark DataFrame with columns that 
> will be mapped to TensorFlow tensors as specified by InputMapping parameter. 
> TFEstimator.fit() returns a TFModel instance representing the trained model, 
> backed on disk by a TensorFlow checkpoint or saved_model.
> TensorFlow Training Application: train_fun(tf_args, TFContext)
> The 1st argument for TFEstimator, train_fun, allows custom TensorFlow 
> applications to be easily plugged into the Spark environment. A custom 
> TensorFlow application will be present as a Python function, 
> train_fun(tf_args, ctx), with 2 arguments:
> tf_args ... Dictionary of arguments specific to TensorFlow "main" function.
> ctx … a TFContext object, which represents the context for TensorFlow 
> application execution
>  
> A TFContext object has the following methods and attributes including
> next_batch(size) ... get a batch from the input RDD
> tf_path(path) … get a TF compatible path
> cluster …  The associated TensorFlow cluster
> job_name … Name of this TensorFlow job: “ps” or “worker” 
> task_index …  Index of this node within a particular job_name
> server … The associated parameter server
> worker_num … The unique ID assigned to this TensorFlow worker
>      
> Pipeline API: TFModel
> model = TFModel()
>   .setModelDir(“my_model_checkpoints”)
>   .setInputMapping({“image”: placeholder_X})
>   .setOutputMapping({“prediction”: “col_out”})
> preds = model.transform(df)
> TFModel is a Spark ML Model  object. It could be constructed by a simple 
> constructor TFModel(). TFModel has a collection of parameters including
> InputMapping … Mapping of input DataFrame column to input tensor
> OutputMapping … Mapping of output tensor to output DataFrame column
> ModelDir … Path to load model checkpoints
> ExportDir … Directory to export saved_model
> SignatureDefKey … Identifier for a specific saved_model signature
> TagSet … Comma-delimited list of tags identifying a saved_model metagraph
> BatchSize … Number of records per batch (deafult: 100)
> TFModel.transform(dataset) applies the associated TensorFlow model on the 
> input DataFrame for TensorFlow inferencing. We support distributed TensorFlow 
> inference in 3 styles:
> Using TensorFlow checkpoint only … Model is loaded from checkpoint as 
> specified ModelDir parameter. The training dataset is a Spark DataFrame with 
> columns that will be mapped to TensorFlow tensors as specified by 
> InputMapping parameter. TFModel.transform() returns an output DataFrame 
> contains output tensors specified by OutputMapping parameter. 
> Using TensorFlow saved model’s predefined signatures … Model is loaded from 
> an exported saved model as specified by ExportDir parameter. The training 
> dataset is a Spark DataFrame with columns that will be mapped to TensorFlow 
> tensors as specified by InputMapping and SignatureDefKey parameter. 
> TFModel.transform() returns an output DataFrame contains output tensors 
> specified by OutputMapping  and SignatureDefKey parameter. In this case, 
> InputMapping and OutputMapping are defined based on TensorFlow aliases 
> specified in a signature.
> Using TensorFlow saved model, but ignoring the predefined signatures … Model 
> is loaded from an exported saved model as specified by ExportDir parameter. 
> The training dataset is a Spark DataFrame with columns that will be mapped to 
> TensorFlow tensors as specified by InputMapping parameter. 
> TFModel.transform() returns an output DataFrame contains output tensors 
> specified by OutputMapping parameter. 
> Sample Applications
> Distributed Training
> Training logic
> Distributed Inference w/ predefined signatures
> Distributed Inference w/o predefined signatures
> MNIST w/ SPARK data feeding 
> MNIST TFEstimator
> map_func(tf_args, ctx)
> MNIST TFModel
> MNIST TFModel
> MNIST w/ TENSORFLOW data feeding 
> MNIST TFEstimator
> map_func(tf_args, ctx)
> MNIST TFModel
> MNIST TFModel
> INCEPTION w/ TENSORFLOW data feeding
> INCEPTION TFEstimator
> main_func(tf_args, ctx)
> INCEPTION TFModel
> Design Sketch
> The above figure illustrates the basic architectural design of 
> TensorFlowOnSpark. We launch TensorFlow training and inferencing on Spark 
> executors. These Spark executors dynamically form a TensorFlow cluster with 
> server-to-server connectivity.  Some of these executors are TensorFlow 
> parameter servers, and others are TensorFlow workers. The server-to-server 
> connectivities enable TFoS to scale easily by adding machines. As illustrated 
> in the above figure, TensorFlowOnSpark doesn’t involve Spark drivers in 
> tensor communication, and thus achieves similar scalability as stand-alone 
> TensorFlow clusters. 
> Optional Rejected Designs
> An alternative design is to have Spark executors only communicate with Spark 
> drivers, not to each other.  We rejected that popular architecture for Spark 
> applications for the following reason:
> It would not allow us to support distributed asynchronous learning. 
> It would not enable model parallelism for large models. 
> Spark driver will become a scalability bottleneck.
> Acknowledgements
> We’d like to thank Tim Hunter (Databricks), Sue Ann Hong (Databricks), and 
> Matei Zaharia (Databricks) for their input on design for TFEstimator and 
> TFModel.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to