[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows

2017-04-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173
 ] 

Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:44 AM:
--

[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling {{transform}} of the 137 {{Bucketizer}} s. Before the change, each call 
of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time 
spent on all {{Bucketizer}} s' {{transform}} is about 50 secs. After the 
change, each call only costs about 0.1 sec.







was (Author: viirya):
[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling {{transform}} of the 137 {{Bucketizer}}. Before the change, each call 
of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time 
spent on all {{Bucketizer}} 's {{transform}} is about 50 secs. After the 
change, each call only costs about 0.1 sec.






> Slow performance when calling fit on ML pipeline for dataset with many 
> columns but few rows
> ---
>
> Key: SPARK-20392
> URL: https://issues.apache.org/jira/browse/SPARK-20392
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Barry Becker
> Attachments: blockbuster.csv, blockbuster_fewCols.csv, 
> giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip
>
>
> This started as a [question on stack 
> overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro],
>  but it seems like a bug.
> I am testing spark pipelines using a simple dataset (attached) with 312 
> (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 
> minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. 
> This seems much to long for such a tiny dataset. Similar pipelines run 
> quickly on datasets that have fewer columns and more rows. It's something 
> about the number of columns that is causing the slow performance.
> Here are a list of the stages in my pipeline:
> {code}
> 000_strIdx_5708525b2b6c
> 001_strIdx_ec2296082913
> 002_bucketizer_3cbc8811877b
> 003_bucketizer_5a01d5d78436
> 004_bucketizer_bf290d11364d
> 005_bucketizer_c3296dfe94b2
> 006_bucketizer_7071ca50eb85
> 007_bucketizer_27738213c2a1
> 008_bucketizer_bd728fd89ba1
> 009_bucketizer_e1e716f51796
> 010_bucketizer_38be665993ba
> 011_bucketizer_5a0e41e5e94f
> 012_bucketizer_b5a3d5743aaa
> 013_bucketizer_4420f98ff7ff
> 014_bucketizer_777cc4fe6d12
> 015_bucketizer_f0f3a3e5530e
> 016_bucketizer_218ecca3b5c1
> 017_bucketizer_0b083439a192
> 018_bucketizer_4520203aec27
> 019_bucketizer_462c2c346079
> 020_bucketizer_47435822e04c
> 021_bucketizer_eb9dccb5e6e8
> 022_bucketizer_b5f63dd7451d
> 023_bucketizer_e0fd5041c841
> 024_bucketizer_ffb3b9737100
> 025_bucketizer_e06c0d29273c
> 026_bucketizer_36ee535a425f
> 027_bucketizer_ee3a330269f1
> 028_bucketizer_094b58ea01c0
> 029_bucketizer_e93ea86c08e2
> 030_bucketizer_4728a718bc4b
> 031_bucketizer_08f6189c7fcc
> 032_bucketizer_11feb74901e6
> 033_bucketizer_ab4add4966c7
> 034_bucketizer_4474f7f1b8ce
> 035_bucketizer_90cfa5918d71
> 036_bucketizer_1a9ff5e4eccb
> 037_bucketizer_38085415a4f4
> 038_bucketizer_9b5e5a8d12eb
> 039_bucketizer_082bb650ecc3
> 040_bucketizer_57e1e363c483
> 041_bucketizer_337583fbfd65
> 042_bucketizer_73e8f6673262
> 043_bucketizer_0f9394ed30b8
> 

[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows

2017-04-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173
 ] 

Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:43 AM:
--

[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling {{transform}} of the 137 {{Bucketizer}}. Before the change, each call 
of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time 
spent on all {{Bucketizer}} 's {{transform}} is about 50 secs. After the 
change, each call only costs about 0.1 sec.







was (Author: viirya):
[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling {{transform}} of the 137 {{Bucketizer}}s. Before the change, each call 
of {{Bucketizer}}'s {{transform}} can cost about 0.4 sec. So the total time 
spent on all {{Bucketizer}}s' {{transform}} is about 50 secs. After the change, 
each call only costs about 0.1 sec.






> Slow performance when calling fit on ML pipeline for dataset with many 
> columns but few rows
> ---
>
> Key: SPARK-20392
> URL: https://issues.apache.org/jira/browse/SPARK-20392
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Barry Becker
> Attachments: blockbuster.csv, blockbuster_fewCols.csv, 
> giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip
>
>
> This started as a [question on stack 
> overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro],
>  but it seems like a bug.
> I am testing spark pipelines using a simple dataset (attached) with 312 
> (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 
> minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. 
> This seems much to long for such a tiny dataset. Similar pipelines run 
> quickly on datasets that have fewer columns and more rows. It's something 
> about the number of columns that is causing the slow performance.
> Here are a list of the stages in my pipeline:
> {code}
> 000_strIdx_5708525b2b6c
> 001_strIdx_ec2296082913
> 002_bucketizer_3cbc8811877b
> 003_bucketizer_5a01d5d78436
> 004_bucketizer_bf290d11364d
> 005_bucketizer_c3296dfe94b2
> 006_bucketizer_7071ca50eb85
> 007_bucketizer_27738213c2a1
> 008_bucketizer_bd728fd89ba1
> 009_bucketizer_e1e716f51796
> 010_bucketizer_38be665993ba
> 011_bucketizer_5a0e41e5e94f
> 012_bucketizer_b5a3d5743aaa
> 013_bucketizer_4420f98ff7ff
> 014_bucketizer_777cc4fe6d12
> 015_bucketizer_f0f3a3e5530e
> 016_bucketizer_218ecca3b5c1
> 017_bucketizer_0b083439a192
> 018_bucketizer_4520203aec27
> 019_bucketizer_462c2c346079
> 020_bucketizer_47435822e04c
> 021_bucketizer_eb9dccb5e6e8
> 022_bucketizer_b5f63dd7451d
> 023_bucketizer_e0fd5041c841
> 024_bucketizer_ffb3b9737100
> 025_bucketizer_e06c0d29273c
> 026_bucketizer_36ee535a425f
> 027_bucketizer_ee3a330269f1
> 028_bucketizer_094b58ea01c0
> 029_bucketizer_e93ea86c08e2
> 030_bucketizer_4728a718bc4b
> 031_bucketizer_08f6189c7fcc
> 032_bucketizer_11feb74901e6
> 033_bucketizer_ab4add4966c7
> 034_bucketizer_4474f7f1b8ce
> 035_bucketizer_90cfa5918d71
> 036_bucketizer_1a9ff5e4eccb
> 037_bucketizer_38085415a4f4
> 038_bucketizer_9b5e5a8d12eb
> 039_bucketizer_082bb650ecc3
> 040_bucketizer_57e1e363c483
> 041_bucketizer_337583fbfd65
> 042_bucketizer_73e8f6673262
> 043_bucketizer_0f9394ed30b8
> 

[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows

2017-04-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173
 ] 

Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:43 AM:
--

[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling {{transform}} of the 137 {{Bucketizer}}s. Before the change, each call 
of {{Bucketizer}}'s {{transform}} can cost about 0.4 sec. So the total time 
spent on all {{Bucketizer}}s' {{transform}} is about 50 secs. After the change, 
each call only costs about 0.1 sec.







was (Author: viirya):
[~barrybecker4] Currently I think the performance downgrade is caused by the 
cost of exchange between DataFrame/Dataset abstraction and logical plans. Some 
operations on DataFrames exchange between DataFrame and logical plans. It can 
be ignored in the usage of SQL. However, it's not rare to chain dozens of 
pipeline stages in ML. When the query plan grows incrementally during running 
those stages, the cost spent on the exchange grows too. In particular, the 
Analyzer will go through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.


> Slow performance when calling fit on ML pipeline for dataset with many 
> columns but few rows
> ---
>
> Key: SPARK-20392
> URL: https://issues.apache.org/jira/browse/SPARK-20392
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Barry Becker
> Attachments: blockbuster.csv, blockbuster_fewCols.csv, 
> giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip
>
>
> This started as a [question on stack 
> overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro],
>  but it seems like a bug.
> I am testing spark pipelines using a simple dataset (attached) with 312 
> (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 
> minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. 
> This seems much to long for such a tiny dataset. Similar pipelines run 
> quickly on datasets that have fewer columns and more rows. It's something 
> about the number of columns that is causing the slow performance.
> Here are a list of the stages in my pipeline:
> {code}
> 000_strIdx_5708525b2b6c
> 001_strIdx_ec2296082913
> 002_bucketizer_3cbc8811877b
> 003_bucketizer_5a01d5d78436
> 004_bucketizer_bf290d11364d
> 005_bucketizer_c3296dfe94b2
> 006_bucketizer_7071ca50eb85
> 007_bucketizer_27738213c2a1
> 008_bucketizer_bd728fd89ba1
> 009_bucketizer_e1e716f51796
> 010_bucketizer_38be665993ba
> 011_bucketizer_5a0e41e5e94f
> 012_bucketizer_b5a3d5743aaa
> 013_bucketizer_4420f98ff7ff
> 014_bucketizer_777cc4fe6d12
> 015_bucketizer_f0f3a3e5530e
> 016_bucketizer_218ecca3b5c1
> 017_bucketizer_0b083439a192
> 018_bucketizer_4520203aec27
> 019_bucketizer_462c2c346079
> 020_bucketizer_47435822e04c
> 021_bucketizer_eb9dccb5e6e8
> 022_bucketizer_b5f63dd7451d
> 023_bucketizer_e0fd5041c841
> 024_bucketizer_ffb3b9737100
> 025_bucketizer_e06c0d29273c
> 026_bucketizer_36ee535a425f
> 027_bucketizer_ee3a330269f1
> 028_bucketizer_094b58ea01c0
> 029_bucketizer_e93ea86c08e2
> 030_bucketizer_4728a718bc4b
> 031_bucketizer_08f6189c7fcc
> 032_bucketizer_11feb74901e6
> 033_bucketizer_ab4add4966c7
> 034_bucketizer_4474f7f1b8ce
> 035_bucketizer_90cfa5918d71
> 036_bucketizer_1a9ff5e4eccb
> 037_bucketizer_38085415a4f4
> 038_bucketizer_9b5e5a8d12eb
> 039_bucketizer_082bb650ecc3
> 040_bucketizer_57e1e363c483
> 041_bucketizer_337583fbfd65
> 042_bucketizer_73e8f6673262
> 043_bucketizer_0f9394ed30b8
> 044_bucketizer_8530f3570019
> 045_bucketizer_c53614f1e507
> 046_bucketizer_8fd99e6ec27b
> 047_bucketizer_6a8610496d8a
> 048_bucketizer_888b0055c1ad
> 049_bucketizer_974e0a1433a6
> 050_bucketizer_e848c0937cb9
> 051_bucketizer_95611095a4ac
> 052_bucketizer_660a6031acd9
> 053_bucketizer_aaffe5a3140d
> 054_bucketizer_8dc569be285f
> 055_bucketizer_83d1bffa07bc
> 

[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows

2017-04-21 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979049#comment-15979049
 ] 

Barry Becker edited comment on SPARK-20392 at 4/21/17 4:49 PM:
---

Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT".

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

// first load the blockbuster data into dataframe
sqlContext.sql("set spark.sql.caseSensitive=true")
val blockbusterDf = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header", "true") // Use first line as header
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("mode", "FAILFAST")
  .option("parserLib", "commons")
  .option("quote", "\"")
  .option("nullValue", "?")
  .option("dateFormat", "-MM-dd'T'HH:mm:ss")
  .option("timestampFormat", "-MM-dd'T'HH:mm:ss")
  .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

// simulate cleaning the data
val cleanDf = cleanData(blockbusterDf)

// load pipeline from disk (update path to point to unzipped pipeline)
val pipeline: PipelineModel = 
PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class")
println("pipeline = " + pipeline.stages.mkString(", "))

// now apply the persisted parquet pipeline to it
val startTime = System.currentTimeMillis()
var newDf = pipeline.transform(cleanDf)
newDf.show(5)
println("time to apply the pipeline = " + (System.currentTimeMillis() - 
startTime))  // about 1 minute
  }

  /** normally I do a bunch of cleaning here, but blockbuster does not need it.
* Basically just creates new *_CLEANED columns and makes int into doubles.
*/
  private def cleanData(data: DataFrame): DataFrame = {
val CLEAN_SUFFIX = "_CLEANED__"
val origCols = data.schema.fields

var newCols: Seq[Column] =
  origCols.map(column => {
  val colName = column.name
  if (column.dataType == StringType)
col(colName).as(colName + CLEAN_SUFFIX)
  else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX)
})

val colsToSelect = newCols ++ origCols.map(c => col(c.name))
data.select(colsToSelect:_*)
  }
{code}


was (Author: barrybecker4):
Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT".

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

// first load the blockbuster data into dataframe
sqlContext.sql("set spark.sql.caseSensitive=true")
val blockbusterDf = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header", "true") // Use first line as header
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("mode", "FAILFAST")
  .option("parserLib", "commons")
  .option("quote", "\"")
  .option("nullValue", "?")
  .option("dateFormat", "-MM-dd'T'HH:mm:ss")
  .option("timestampFormat", "-MM-dd'T'HH:mm:ss")
  .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

// simulate cleaning the data
val cleanDf = cleanData(blockbusterDf)

// load pipeline from disk (update path to point to unzipped pipeline)
val pipeline: PipelineModel = 

[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows

2017-04-21 Thread Barry Becker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979049#comment-15979049
 ] 

Barry Becker edited comment on SPARK-20392 at 4/21/17 4:46 PM:
---

Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT".

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

// first load the blockbuster data into dataframe
sqlContext.sql("set spark.sql.caseSensitive=true")
val blockbusterDf = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header", "true") // Use first line as header
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("mode", "FAILFAST")
  .option("parserLib", "commons")
  .option("quote", "\"")
  .option("nullValue", "?")
  .option("dateFormat", "-MM-dd'T'HH:mm:ss")
  .option("timestampFormat", "-MM-dd'T'HH:mm:ss")
  .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

// simulate cleaning the data
val cleanDf = cleanData(blockbusterDf)

// load pipeline from disk (update path to point to unzipped pipeline)
val pipeline: PipelineModel = 
PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class")
println("pipeline = " + pipeline.stages.mkString(", "))

// now apply the persisted parquet pipeline to it
val startTime = System.currentTimeMillis()
var newDf = pipeline.transform(cleanDf)
newDf.show(5)
println("time to apply the pipeline = " + (System.currentTimeMillis() - 
startTime))  // about 1 minute
  }

  /** normally I do a bunch of cleaning here, but blockbuster does not need it.
* Basically just creates new *_CLEANED columns and makes int into doubles.
*/
  private def cleanData(data: DataFrame): DataFrame = {
val CLEAN_SUFFIX = "_CLEANED__"
val MISSING = StringToIndexTransformer.MISSING
val NUMERIC_NULL = Double.NaN

val origCols = data.schema.fields

var newCols: Seq[Column] =
  origCols.map(column => {
  val colName = column.name
  if (column.dataType == StringType)
col(colName).as(colName + CLEAN_SUFFIX)
  else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX)
})

val colsToSelect = newCols ++ origCols.map(c => col(c.name))

data.select(colsToSelect:_*)
  }
{code}


was (Author: barrybecker4):
Yes [~kiszk], I was able to create a simple program that will allow you to 
reproduce this problem.
You will need the bockbuster.csv dataset and the parquet pipeline (which are 
both attached to this jira).
The pipeline will need to be unzipped, and there are 2 lines in the below 
program that will need to be updated so that they point to those 2 files on 
your computer.

Here is my test case:
{code}
  /**
   * This test has much slower performance than expected.
   * The blockbuster dataset has only 312 rows but has 421 columns - tiny by 
most standards.
   * This test takes about 1 min 10 seconds on my 4 core laptop,
   * but I have seen similar slow performance on more powerful server
   */
  test("apply persisted parquet model pipeline to blockbuster dataset to get 
prediction on \"DAYPOP\"") {

// first load the blockbuster data into dataframe
sqlContext.sql("set spark.sql.caseSensitive=true")
val blockbusterDf = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header", "true") // Use first line as header
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("mode", "FAILFAST")
  .option("parserLib", "commons")
  .option("quote", "\"")
  .option("nullValue", "?")
  .option("dateFormat", "-MM-dd'T'HH:mm:ss")
  .option("timestampFormat", "-MM-dd'T'HH:mm:ss")
  .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + 
"/src/test/resources/data/blockbuster.csv") // update

// simulate cleaning the data
val cleanDf = cleanData(blockbusterDf)

// load pipeline from disk (update path to point to unzipped pipeline)
val pipeline: PipelineModel =