[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows
[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173 ] Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:44 AM: -- [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling {{transform}} of the 137 {{Bucketizer}} s. Before the change, each call of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time spent on all {{Bucketizer}} s' {{transform}} is about 50 secs. After the change, each call only costs about 0.1 sec. was (Author: viirya): [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling {{transform}} of the 137 {{Bucketizer}}. Before the change, each call of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time spent on all {{Bucketizer}} 's {{transform}} is about 50 secs. After the change, each call only costs about 0.1 sec. > Slow performance when calling fit on ML pipeline for dataset with many > columns but few rows > --- > > Key: SPARK-20392 > URL: https://issues.apache.org/jira/browse/SPARK-20392 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0 >Reporter: Barry Becker > Attachments: blockbuster.csv, blockbuster_fewCols.csv, > giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip > > > This started as a [question on stack > overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro], > but it seems like a bug. > I am testing spark pipelines using a simple dataset (attached) with 312 > (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 > minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. > This seems much to long for such a tiny dataset. Similar pipelines run > quickly on datasets that have fewer columns and more rows. It's something > about the number of columns that is causing the slow performance. > Here are a list of the stages in my pipeline: > {code} > 000_strIdx_5708525b2b6c > 001_strIdx_ec2296082913 > 002_bucketizer_3cbc8811877b > 003_bucketizer_5a01d5d78436 > 004_bucketizer_bf290d11364d > 005_bucketizer_c3296dfe94b2 > 006_bucketizer_7071ca50eb85 > 007_bucketizer_27738213c2a1 > 008_bucketizer_bd728fd89ba1 > 009_bucketizer_e1e716f51796 > 010_bucketizer_38be665993ba > 011_bucketizer_5a0e41e5e94f > 012_bucketizer_b5a3d5743aaa > 013_bucketizer_4420f98ff7ff > 014_bucketizer_777cc4fe6d12 > 015_bucketizer_f0f3a3e5530e > 016_bucketizer_218ecca3b5c1 > 017_bucketizer_0b083439a192 > 018_bucketizer_4520203aec27 > 019_bucketizer_462c2c346079 > 020_bucketizer_47435822e04c > 021_bucketizer_eb9dccb5e6e8 > 022_bucketizer_b5f63dd7451d > 023_bucketizer_e0fd5041c841 > 024_bucketizer_ffb3b9737100 > 025_bucketizer_e06c0d29273c > 026_bucketizer_36ee535a425f > 027_bucketizer_ee3a330269f1 > 028_bucketizer_094b58ea01c0 > 029_bucketizer_e93ea86c08e2 > 030_bucketizer_4728a718bc4b > 031_bucketizer_08f6189c7fcc > 032_bucketizer_11feb74901e6 > 033_bucketizer_ab4add4966c7 > 034_bucketizer_4474f7f1b8ce > 035_bucketizer_90cfa5918d71 > 036_bucketizer_1a9ff5e4eccb > 037_bucketizer_38085415a4f4 > 038_bucketizer_9b5e5a8d12eb > 039_bucketizer_082bb650ecc3 > 040_bucketizer_57e1e363c483 > 041_bucketizer_337583fbfd65 > 042_bucketizer_73e8f6673262 > 043_bucketizer_0f9394ed30b8 >
[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows
[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173 ] Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:43 AM: -- [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling {{transform}} of the 137 {{Bucketizer}}. Before the change, each call of {{Bucketizer}} 's {{transform}} can cost about 0.4 sec. So the total time spent on all {{Bucketizer}} 's {{transform}} is about 50 secs. After the change, each call only costs about 0.1 sec. was (Author: viirya): [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling {{transform}} of the 137 {{Bucketizer}}s. Before the change, each call of {{Bucketizer}}'s {{transform}} can cost about 0.4 sec. So the total time spent on all {{Bucketizer}}s' {{transform}} is about 50 secs. After the change, each call only costs about 0.1 sec. > Slow performance when calling fit on ML pipeline for dataset with many > columns but few rows > --- > > Key: SPARK-20392 > URL: https://issues.apache.org/jira/browse/SPARK-20392 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0 >Reporter: Barry Becker > Attachments: blockbuster.csv, blockbuster_fewCols.csv, > giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip > > > This started as a [question on stack > overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro], > but it seems like a bug. > I am testing spark pipelines using a simple dataset (attached) with 312 > (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 > minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. > This seems much to long for such a tiny dataset. Similar pipelines run > quickly on datasets that have fewer columns and more rows. It's something > about the number of columns that is causing the slow performance. > Here are a list of the stages in my pipeline: > {code} > 000_strIdx_5708525b2b6c > 001_strIdx_ec2296082913 > 002_bucketizer_3cbc8811877b > 003_bucketizer_5a01d5d78436 > 004_bucketizer_bf290d11364d > 005_bucketizer_c3296dfe94b2 > 006_bucketizer_7071ca50eb85 > 007_bucketizer_27738213c2a1 > 008_bucketizer_bd728fd89ba1 > 009_bucketizer_e1e716f51796 > 010_bucketizer_38be665993ba > 011_bucketizer_5a0e41e5e94f > 012_bucketizer_b5a3d5743aaa > 013_bucketizer_4420f98ff7ff > 014_bucketizer_777cc4fe6d12 > 015_bucketizer_f0f3a3e5530e > 016_bucketizer_218ecca3b5c1 > 017_bucketizer_0b083439a192 > 018_bucketizer_4520203aec27 > 019_bucketizer_462c2c346079 > 020_bucketizer_47435822e04c > 021_bucketizer_eb9dccb5e6e8 > 022_bucketizer_b5f63dd7451d > 023_bucketizer_e0fd5041c841 > 024_bucketizer_ffb3b9737100 > 025_bucketizer_e06c0d29273c > 026_bucketizer_36ee535a425f > 027_bucketizer_ee3a330269f1 > 028_bucketizer_094b58ea01c0 > 029_bucketizer_e93ea86c08e2 > 030_bucketizer_4728a718bc4b > 031_bucketizer_08f6189c7fcc > 032_bucketizer_11feb74901e6 > 033_bucketizer_ab4add4966c7 > 034_bucketizer_4474f7f1b8ce > 035_bucketizer_90cfa5918d71 > 036_bucketizer_1a9ff5e4eccb > 037_bucketizer_38085415a4f4 > 038_bucketizer_9b5e5a8d12eb > 039_bucketizer_082bb650ecc3 > 040_bucketizer_57e1e363c483 > 041_bucketizer_337583fbfd65 > 042_bucketizer_73e8f6673262 > 043_bucketizer_0f9394ed30b8 >
[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows
[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984173#comment-15984173 ] Liang-Chi Hsieh edited comment on SPARK-20392 at 4/26/17 6:43 AM: -- [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling {{transform}} of the 137 {{Bucketizer}}s. Before the change, each call of {{Bucketizer}}'s {{transform}} can cost about 0.4 sec. So the total time spent on all {{Bucketizer}}s' {{transform}} is about 50 secs. After the change, each call only costs about 0.1 sec. was (Author: viirya): [~barrybecker4] Currently I think the performance downgrade is caused by the cost of exchange between DataFrame/Dataset abstraction and logical plans. Some operations on DataFrames exchange between DataFrame and logical plans. It can be ignored in the usage of SQL. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the cost spent on the exchange grows too. In particular, the Analyzer will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. > Slow performance when calling fit on ML pipeline for dataset with many > columns but few rows > --- > > Key: SPARK-20392 > URL: https://issues.apache.org/jira/browse/SPARK-20392 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.0 >Reporter: Barry Becker > Attachments: blockbuster.csv, blockbuster_fewCols.csv, > giant_query_plan_for_fitting_pipeline.txt, model_9754.zip, model_9756.zip > > > This started as a [question on stack > overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro], > but it seems like a bug. > I am testing spark pipelines using a simple dataset (attached) with 312 > (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 > minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. > This seems much to long for such a tiny dataset. Similar pipelines run > quickly on datasets that have fewer columns and more rows. It's something > about the number of columns that is causing the slow performance. > Here are a list of the stages in my pipeline: > {code} > 000_strIdx_5708525b2b6c > 001_strIdx_ec2296082913 > 002_bucketizer_3cbc8811877b > 003_bucketizer_5a01d5d78436 > 004_bucketizer_bf290d11364d > 005_bucketizer_c3296dfe94b2 > 006_bucketizer_7071ca50eb85 > 007_bucketizer_27738213c2a1 > 008_bucketizer_bd728fd89ba1 > 009_bucketizer_e1e716f51796 > 010_bucketizer_38be665993ba > 011_bucketizer_5a0e41e5e94f > 012_bucketizer_b5a3d5743aaa > 013_bucketizer_4420f98ff7ff > 014_bucketizer_777cc4fe6d12 > 015_bucketizer_f0f3a3e5530e > 016_bucketizer_218ecca3b5c1 > 017_bucketizer_0b083439a192 > 018_bucketizer_4520203aec27 > 019_bucketizer_462c2c346079 > 020_bucketizer_47435822e04c > 021_bucketizer_eb9dccb5e6e8 > 022_bucketizer_b5f63dd7451d > 023_bucketizer_e0fd5041c841 > 024_bucketizer_ffb3b9737100 > 025_bucketizer_e06c0d29273c > 026_bucketizer_36ee535a425f > 027_bucketizer_ee3a330269f1 > 028_bucketizer_094b58ea01c0 > 029_bucketizer_e93ea86c08e2 > 030_bucketizer_4728a718bc4b > 031_bucketizer_08f6189c7fcc > 032_bucketizer_11feb74901e6 > 033_bucketizer_ab4add4966c7 > 034_bucketizer_4474f7f1b8ce > 035_bucketizer_90cfa5918d71 > 036_bucketizer_1a9ff5e4eccb > 037_bucketizer_38085415a4f4 > 038_bucketizer_9b5e5a8d12eb > 039_bucketizer_082bb650ecc3 > 040_bucketizer_57e1e363c483 > 041_bucketizer_337583fbfd65 > 042_bucketizer_73e8f6673262 > 043_bucketizer_0f9394ed30b8 > 044_bucketizer_8530f3570019 > 045_bucketizer_c53614f1e507 > 046_bucketizer_8fd99e6ec27b > 047_bucketizer_6a8610496d8a > 048_bucketizer_888b0055c1ad > 049_bucketizer_974e0a1433a6 > 050_bucketizer_e848c0937cb9 > 051_bucketizer_95611095a4ac > 052_bucketizer_660a6031acd9 > 053_bucketizer_aaffe5a3140d > 054_bucketizer_8dc569be285f > 055_bucketizer_83d1bffa07bc >
[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows
[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979049#comment-15979049 ] Barry Becker edited comment on SPARK-20392 at 4/21/17 4:49 PM: --- Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT". Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "-MM-dd'T'HH:mm:ss") .option("timestampFormat", "-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel = PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class") println("pipeline = " + pipeline.stages.mkString(", ")) // now apply the persisted parquet pipeline to it val startTime = System.currentTimeMillis() var newDf = pipeline.transform(cleanDf) newDf.show(5) println("time to apply the pipeline = " + (System.currentTimeMillis() - startTime)) // about 1 minute } /** normally I do a bunch of cleaning here, but blockbuster does not need it. * Basically just creates new *_CLEANED columns and makes int into doubles. */ private def cleanData(data: DataFrame): DataFrame = { val CLEAN_SUFFIX = "_CLEANED__" val origCols = data.schema.fields var newCols: Seq[Column] = origCols.map(column => { val colName = column.name if (column.dataType == StringType) col(colName).as(colName + CLEAN_SUFFIX) else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX) }) val colsToSelect = newCols ++ origCols.map(c => col(c.name)) data.select(colsToSelect:_*) } {code} was (Author: barrybecker4): Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT". Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "-MM-dd'T'HH:mm:ss") .option("timestampFormat", "-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel =
[jira] [Comment Edited] (SPARK-20392) Slow performance when calling fit on ML pipeline for dataset with many columns but few rows
[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979049#comment-15979049 ] Barry Becker edited comment on SPARK-20392 at 4/21/17 4:46 PM: --- Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT". Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "-MM-dd'T'HH:mm:ss") .option("timestampFormat", "-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel = PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class") println("pipeline = " + pipeline.stages.mkString(", ")) // now apply the persisted parquet pipeline to it val startTime = System.currentTimeMillis() var newDf = pipeline.transform(cleanDf) newDf.show(5) println("time to apply the pipeline = " + (System.currentTimeMillis() - startTime)) // about 1 minute } /** normally I do a bunch of cleaning here, but blockbuster does not need it. * Basically just creates new *_CLEANED columns and makes int into doubles. */ private def cleanData(data: DataFrame): DataFrame = { val CLEAN_SUFFIX = "_CLEANED__" val MISSING = StringToIndexTransformer.MISSING val NUMERIC_NULL = Double.NaN val origCols = data.schema.fields var newCols: Seq[Column] = origCols.map(column => { val colName = column.name if (column.dataType == StringType) col(colName).as(colName + CLEAN_SUFFIX) else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX) }) val colsToSelect = newCols ++ origCols.map(c => col(c.name)) data.select(colsToSelect:_*) } {code} was (Author: barrybecker4): Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "-MM-dd'T'HH:mm:ss") .option("timestampFormat", "-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel =