[ https://issues.apache.org/jira/browse/SPARK-20392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979049#comment-15979049 ]
Barry Becker edited comment on SPARK-20392 at 4/21/17 4:46 PM: --------------------------------------------------------------- Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. When I ran locally, I used spark "2.1.1-SNAPSHOT". Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss") .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("\\\\", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel = PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class") println("pipeline = " + pipeline.stages.mkString(", ")) // now apply the persisted parquet pipeline to it val startTime = System.currentTimeMillis() var newDf = pipeline.transform(cleanDf) newDf.show(5) println("time to apply the pipeline = " + (System.currentTimeMillis() - startTime)) // about 1 minute } /** normally I do a bunch of cleaning here, but blockbuster does not need it. * Basically just creates new *_CLEANED columns and makes int into doubles. */ private def cleanData(data: DataFrame): DataFrame = { val CLEAN_SUFFIX = "_CLEANED__" val MISSING = StringToIndexTransformer.MISSING val NUMERIC_NULL = Double.NaN val origCols = data.schema.fields var newCols: Seq[Column] = origCols.map(column => { val colName = column.name if (column.dataType == StringType) col(colName).as(colName + CLEAN_SUFFIX) else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX) }) val colsToSelect = newCols ++ origCols.map(c => col(c.name)) data.select(colsToSelect:_*) } {code} was (Author: barrybecker4): Yes [~kiszk], I was able to create a simple program that will allow you to reproduce this problem. You will need the bockbuster.csv dataset and the parquet pipeline (which are both attached to this jira). The pipeline will need to be unzipped, and there are 2 lines in the below program that will need to be updated so that they point to those 2 files on your computer. Here is my test case: {code} /** * This test has much slower performance than expected. * The blockbuster dataset has only 312 rows but has 421 columns - tiny by most standards. * This test takes about 1 min 10 seconds on my 4 core laptop, * but I have seen similar slow performance on more powerful server */ test("apply persisted parquet model pipeline to blockbuster dataset to get prediction on \"DAYPOP\"") { // first load the blockbuster data into dataframe sqlContext.sql("set spark.sql.caseSensitive=true") val blockbusterDf = sqlContext.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") // Use first line as header .option("inferSchema", "true") .option("delimiter", ",") .option("mode", "FAILFAST") .option("parserLib", "commons") .option("quote", "\"") .option("nullValue", "?") .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss") .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss") .csv(sys.env("SPARK_DEV_HOME").replaceAll("\\\\", "/") + "/src/test/resources/data/blockbuster.csv") // update // simulate cleaning the data val cleanDf = cleanData(blockbusterDf) // load pipeline from disk (update path to point to unzipped pipeline) val pipeline: PipelineModel = PipelineModel.load(s"file:///$FILE_PREFIX/models/model_9754.class") println("pipeline = " + pipeline.stages.mkString(", ")) // now apply the persisted parquet pipeline to it val startTime = System.currentTimeMillis() var newDf = pipeline.transform(cleanDf) newDf.show(5) println("time to apply the pipeline = " + (System.currentTimeMillis() - startTime)) // about 1 minute } /** normally I do a bunch of cleaning here, but blockbuster does not need it. * Basically just creates new *_CLEANED columns and makes int into doubles. */ private def cleanData(data: DataFrame): DataFrame = { val CLEAN_SUFFIX = "_CLEANED__" val MISSING = StringToIndexTransformer.MISSING val NUMERIC_NULL = Double.NaN val origCols = data.schema.fields var newCols: Seq[Column] = origCols.map(column => { val colName = column.name if (column.dataType == StringType) col(colName).as(colName + CLEAN_SUFFIX) else col(colName).cast(DoubleType).as(colName + CLEAN_SUFFIX) }) val colsToSelect = newCols ++ origCols.map(c => col(c.name)) data.select(colsToSelect:_*) } {code} > Slow performance when calling fit on ML pipeline for dataset with many > columns but few rows > ------------------------------------------------------------------------------------------- > > Key: SPARK-20392 > URL: https://issues.apache.org/jira/browse/SPARK-20392 > Project: Spark > Issue Type: Bug > Components: ML > Affects Versions: 2.1.0 > Reporter: Barry Becker > Attachments: blockbuster.csv, > giant_query_plan_for_fitting_pipeline.txt > > > This started as a [question on stack > overflow|http://stackoverflow.com/questions/43484006/why-is-it-slow-to-apply-a-spark-pipeline-to-dataset-with-many-columns-but-few-ro], > but it seems like a bug. > I am testing spark pipelines using a simple dataset (attached) with 312 > (mostly numeric) columns, but only 421 rows. It is small, but it takes 3 > minutes to apply my ML pipeline to it on a 24 core server with 60G of memory. > This seems much to long for such a tiny dataset. Similar pipelines run > quickly on datasets that have fewer columns and more rows. It's something > about the number of columns that is causing the slow performance. > Here are a list of the stages in my pipeline: > {code} > 000_strIdx_5708525b2b6c > 001_strIdx_ec2296082913 > 002_bucketizer_3cbc8811877b > 003_bucketizer_5a01d5d78436 > 004_bucketizer_bf290d11364d > 005_bucketizer_c3296dfe94b2 > 006_bucketizer_7071ca50eb85 > 007_bucketizer_27738213c2a1 > 008_bucketizer_bd728fd89ba1 > 009_bucketizer_e1e716f51796 > 010_bucketizer_38be665993ba > 011_bucketizer_5a0e41e5e94f > 012_bucketizer_b5a3d5743aaa > 013_bucketizer_4420f98ff7ff > 014_bucketizer_777cc4fe6d12 > 015_bucketizer_f0f3a3e5530e > 016_bucketizer_218ecca3b5c1 > 017_bucketizer_0b083439a192 > 018_bucketizer_4520203aec27 > 019_bucketizer_462c2c346079 > 020_bucketizer_47435822e04c > 021_bucketizer_eb9dccb5e6e8 > 022_bucketizer_b5f63dd7451d > 023_bucketizer_e0fd5041c841 > 024_bucketizer_ffb3b9737100 > 025_bucketizer_e06c0d29273c > 026_bucketizer_36ee535a425f > 027_bucketizer_ee3a330269f1 > 028_bucketizer_094b58ea01c0 > 029_bucketizer_e93ea86c08e2 > 030_bucketizer_4728a718bc4b > 031_bucketizer_08f6189c7fcc > 032_bucketizer_11feb74901e6 > 033_bucketizer_ab4add4966c7 > 034_bucketizer_4474f7f1b8ce > 035_bucketizer_90cfa5918d71 > 036_bucketizer_1a9ff5e4eccb > 037_bucketizer_38085415a4f4 > 038_bucketizer_9b5e5a8d12eb > 039_bucketizer_082bb650ecc3 > 040_bucketizer_57e1e363c483 > 041_bucketizer_337583fbfd65 > 042_bucketizer_73e8f6673262 > 043_bucketizer_0f9394ed30b8 > 044_bucketizer_8530f3570019 > 045_bucketizer_c53614f1e507 > 046_bucketizer_8fd99e6ec27b > 047_bucketizer_6a8610496d8a > 048_bucketizer_888b0055c1ad > 049_bucketizer_974e0a1433a6 > 050_bucketizer_e848c0937cb9 > 051_bucketizer_95611095a4ac > 052_bucketizer_660a6031acd9 > 053_bucketizer_aaffe5a3140d > 054_bucketizer_8dc569be285f > 055_bucketizer_83d1bffa07bc > 056_bucketizer_0c6180ba75e6 > 057_bucketizer_452f265a000d > 058_bucketizer_38e02ddfb447 > 059_bucketizer_6fa4ad5d3ebd > 060_bucketizer_91044ee766ce > 061_bucketizer_9a9ef04a173d > 062_bucketizer_3d98eb15f206 > 063_bucketizer_c4915bb4d4ed > 064_bucketizer_8ca2b6550c38 > 065_bucketizer_417ee9b760bc > 066_bucketizer_67f3556bebe8 > 067_bucketizer_0556deb652c6 > 068_bucketizer_067b4b3d234c > 069_bucketizer_30ba55321538 > 070_bucketizer_ad826cc5d746 > 071_bucketizer_77676a898055 > 072_bucketizer_05c37a38ce30 > 073_bucketizer_6d9ae54163ed > 074_bucketizer_8cd668b2855d > 075_bucketizer_d50ea1732021 > 076_bucketizer_c68f467c9559 > 077_bucketizer_ee1dfc840db1 > 078_bucketizer_83ec06a32519 > 079_bucketizer_741d08c1b69e > 080_bucketizer_b7402e4829c7 > 081_bucketizer_8adc590dc447 > 082_bucketizer_673be99bdace > 083_bucketizer_77693b45f94c > 084_bucketizer_53529c6b1ac4 > 085_bucketizer_6a3ca776a81e > 086_bucketizer_6679d9588ac1 > 087_bucketizer_6c73af456f65 > 088_bucketizer_2291b2c5ab51 > 089_bucketizer_cb3d0fe669d8 > 090_bucketizer_e71f913c1512 > 091_bucketizer_156528f65ce7 > 092_bucketizer_f3ec5dae079b > 093_bucketizer_809fab77eee1 > 094_bucketizer_6925831511e6 > 095_bucketizer_c5d853b95707 > 096_bucketizer_e677659ca253 > 097_bucketizer_396e35548c72 > 098_bucketizer_78a6410d7a84 > 099_bucketizer_e3ae6e54bca1 > 100_bucketizer_9fed5923fe8a > 101_bucketizer_8925ba4c3ee2 > 102_bucketizer_95750b6942b8 > 103_bucketizer_6e8b50a1918b > 104_bucketizer_36cfcc13d4ba > 105_bucketizer_2716d0455512 > 106_bucketizer_9bcf2891652f > 107_bucketizer_8c3d352915f7 > 108_bucketizer_0786c17d5ef9 > 109_bucketizer_f22df23ef56f > 110_bucketizer_bad04578bd20 > 111_bucketizer_35cfbde7e28f > 112_bucketizer_cf89177a528b > 113_bucketizer_183a0d393ef0 > 114_bucketizer_467c78156a67 > 115_bucketizer_380345e651ab > 116_bucketizer_0f39f6de1625 > 117_bucketizer_d8500b2c0c2f > 118_bucketizer_dc5f1fd09ff1 > 119_bucketizer_eeaf9e6cdaef > 120_bucketizer_5614cd4533d7 > 121_bucketizer_2f1230e2871e > 122_bucketizer_f8bf9d47e57e > 123_bucketizer_2df774393575 > 124_bucketizer_259320b7fc86 > 125_bucketizer_e334afc63030 > 126_bucketizer_f17d4d6b4d94 > 127_bucketizer_da7834230ecd > 128_bucketizer_8dbb503f658e > 129_bucketizer_e09e2eb2b181 > 130_bucketizer_faa04fa16f3c > 131_bucketizer_d0bd348a5613 > 132_bucketizer_de6da796e294 > 133_bucketizer_0395526346ce > 134_bucketizer_ea3b5eb6058f > 135_bucketizer_ad83472038f7 > 136_bucketizer_4a17c440fd16 > 137_bucketizer_d468637d4b86 > 138_bucketizer_4fc473a72f1d > 139_vecAssembler_bd87cd105650 > 140_nb_f134e0890a0d > 141_sql_a8590b83c826 > {code} > There are 2 string columns that are converted to ints with > StringIndexerModel. Then there are bucketizers that bin all the numeric > columns into 2 or 3 mins each. Is there a way to bin many columns at once > with a single stage? I did not see a way. Next there is a VectorAssembler to > combine all the columns into one for the NaiveBayes classifier. Lastly, there > is a simple SQLTransformer to cast one the prection column to an int. > Here is what the metadata for the two StringIndexerModelss looks like: > {code} > {"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551461778,"sparkVersion":"2.1.1","uid":"strIdx_5708525b2b6c","paramMap":{"outputCol":"ADI_IDX__","handleInvalid":"skip","inputCol":"ADI_CLEANED__"}} > {"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551462004,"sparkVersion":"2.1.1","uid":"strIdx_ec2296082913","paramMap":{"outputCol":"State_IDX__","inputCol":"State_CLEANED__","handleInvalid":"skip"}} > {code} > The bucketizers all look very similar. Here is what the meta data for few of > them look like: > {code} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462636,"sparkVersion":"2.1.1","uid":"bucketizer_bd728fd89ba1","paramMap":{"outputCol":"HH_02_BINNED__","inputCol":"HH_02_CLEANED__","handleInvalid":"keep","splits":["-Inf",7521.0,12809.5,20299.0,"Inf"]}} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462711,"sparkVersion":"2.1.1","uid":"bucketizer_e1e716f51796","paramMap":{"splits":["-Inf",6698.0,13690.5,"Inf"],"handleInvalid":"keep","outputCol":"HH_97_BINNED__","inputCol":"HH_97_CLEANED__"}} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462784,"sparkVersion":"2.1.1","uid":"bucketizer_38be665993ba","paramMap":{"splits":["-Inf",4664.0,7242.5,11770.0,14947.0,"Inf"],"outputCol":"HH_90_BINNED__","handleInvalid":"keep","inputCol":"HH_90_CLEANED__"}} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462858,"sparkVersion":"2.1.1","uid":"bucketizer_5a0e41e5e94f","paramMap":{"splits":["-Inf",6107.5,10728.5,"Inf"],"outputCol":"HH_80_BINNED__","inputCol":"HH_80_CLEANED__","handleInvalid":"keep"}} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462931,"sparkVersion":"2.1.1","uid":"bucketizer_b5a3d5743aaa","paramMap":{"outputCol":"HHPG9702_BINNED__","splits":["-Inf",8.895000457763672,"Inf"],"handleInvalid":"keep","inputCol":"HHPG9702_CLEANED__"}} > {"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551463004,"sparkVersion":"2.1.1","uid":"bucketizer_4420f98ff7ff","paramMap":{"splits":["-Inf",54980.5,"Inf"],"outputCol":"MEDHI97_BINNED__","handleInvalid":"keep","inputCol":"MEDHI97_CLEANED__"}} > {code} > Here is the metadata for the NaiveBayes model: > {code} > {"class":"org.apache.spark.ml.classification.NaiveBayesModel","timestamp":1492551472568,"sparkVersion":"2.1.1","uid":"nb_f134e0890a0d","paramMap":{"modelType":"multinomial","probabilityCol":"_class_probability_column__","smoothing":1.0,"predictionCol":"_prediction_column_","rawPredictionCol":"rawPrediction","featuresCol":"_features_column__","labelCol":"DAYPOP_BINNED__"}} > {code} > and for the final SQLTransformer > {code} > {"class":"org.apache.spark.ml.feature.SQLTransformer","timestamp":1492551472804,"sparkVersion":"2.1.1","uid":"sql_a8590b83c826","paramMap":{"statement":"SELECT > *, CAST(_prediction_column_ AS INT) AS `_*_prediction_label_column_*__` FROM > __THIS__"}} > {code} > Why is it that the duration gets extremely slow when more than a couple > hundred columns (and only a few rows), but having millions of rows (with > fewer columns) performs fine? In addition to it being slow when applying this > pipeline, it is also slow to create it. The fit and evaluate steps take a few > minutes each. Is there anything that can be done to make it faster? > I get similar results using 2.1.1RC, 2.1.2(tip) and 2.2.0(tip). Spark 2.1.0 > gives a Janino 64k limit error when trying to build this pipeline (see > https://issues.apache.org/jira/browse/SPARK-16845). > I stepped through in the debugger when pipeline.fit was called and noticed > that the queryPlan is a huge nested structure. I don't know how to interpret > this plan, but it is likely related to the performance problem. It is > attached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org