How to increase the parallelism of Spark Streaming application?
I have a Spark Streaming application which reads data from kafka and save the the transformation result to hdfs. My original partition number of kafka topic is 8, and repartition the data to 100 to increase the parallelism of spark job. Now I am wondering if I increase the kafka partition number to 100 instead of setting repartition to 100, will the performance be enhanced? (I know repartition action cost a lot cpu resource) If I set the kafka partition number to 100, does it have any negative efficiency? I just have one production environment so it's not convenient for me to do the test Thanks! Regard, Junfeng Chen
SPARK-25959 - Difference in featureImportances results on computed vs saved models
Hi Spark Users, I tried to implement GBT and found that the feature Importance computed while the model was fit is different when the same model was saved into a storage and loaded back. I also found that once the persistent model is loaded and saved back again and loaded, the feature importance remains the same. Not sure if its bug while storing and reading the model first time or am missing some parameter that need to be set before saving the model (thus model is picking some defaults - causing feature importance to change) *Below is the test code:* val testDF = Seq( (1, 3, 2, 1, 1), (3, 2, 1, 2, 0), (2, 2, 1, 1, 0), (3, 4, 2, 2, 0), (2, 2, 1, 3, 1) ).toDF("a", "b", "c", "d", "e") val featureColumns = testDF.columns.filter(_ != "e") // Assemble the features into a vector val assembler = new VectorAssembler().setInputCols (featureColumns).setOutputCol("features") // Transform the data to get the feature data set val featureDF = assembler.transform(testDF) // Train a GBT model. val gbt = new GBTClassifier() .setLabelCol("e") .setFeaturesCol("features") .setMaxDepth(2) .setMaxBins(5) .setMaxIter(10) .setSeed(10) .fit(featureDF) gbt.transform(featureDF).show(false) // Write out the model featureColumns.zip(gbt.featureImportances.toArray).sortBy(-_ ._2).take(20).foreach(println) /* Prints (d,0.5931875075767403) (a,0.3747184548362353) (b,0.03209403758702444) (c,0.0) */ gbt.write.overwrite().save("file:///tmp/test123") println("Reading model again") val gbtload = GBTClassificationModel.load("file:///tmp/test123") featureColumns.zip(gbtload.featureImportances.toArray).sortB y(-_._2).take(20).foreach(println) /* Prints (d,0.6455841215290767) (a,0.3316126797964181) (b,0.022803198674505094) (c,0.0) */ gbtload.write.overwrite().save("file:///tmp/test123_rewrite") val gbtload2 = GBTClassificationModel.load("file:///tmp/test123_rewrite") featureColumns.zip(gbtload2.featureImportances.toArray).sort By(-_._2).take(20).foreach(println) /* prints (d,0.6455841215290767) (a,0.3316126797964181) (b,0.022803198674505094) (c,0.0) */ Any help is appreciated in making sure the feature importance is maintenaned as is while the model is first stored. Thanks!
Re: Shuffle write explosion
Hi Dillon, Thank you for your reply. mapToPair use a PairFunction to transform format to a particular parquet format. I have tried to replace the mapToPair() function with other action operators like count() or collect(), but it didn't work. So I guess the shuffle write explosion problem have no concern with mapToPair(). Best Regrads, Yichen Dillon Dukek 于2018年11月6日周二 上午7:21写道: > What is your function in mapToPair doing? > > -Dillon > > On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox > wrote: > >> At first glance, I wonder if your tables are partitioned? There may not >> be enough parallelism happening. You can also pass in the number of >> partitions and/or a custom partitioner to help Spark “guess” how to >> organize the shuffle. >> >> >> >> Have you seen any of these docs? >> >> >> https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf >> >> https://spark.apache.org/docs/latest/tuning.html >> >> >> >> Taylor >> >> >> >> >> >> *From:* Yichen Zhou >> *Sent:* Sunday, November 4, 2018 11:42 PM >> *To:* user@spark.apache.org >> *Subject:* Shuffle write explosion >> >> >> >> Hi All, >> >> >> >> When running a spark job, I have 100MB+ input and get more than 700GB >> shuffle write, which is really weird. And this job finally end up with the >> OOM error. Does anybody know why this happened? >> >> [image: Screen Shot 2018-11-05 at 15.20.35.png] >> >> My code is like: >> >> JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class, >> Text.class); >> >> >> >> inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration()) >> ; >> >> >> Environment: >> >> *CPU 32 core; Memory 256G; Storage 7.5G CentOS 7.5* >> >> *java version "1.8.0_162"* >> >> *Spark 2.1.2* >> >> >> Any help is greatly appreciated. >> >> >> >> Regards, >> >> Yichen >> >
Re: Spark 2.4.0 artifact in Maven repository
Hi Matei, Thanks for your answer, it's much clearer now. I was not aware about the time needed for the release preparation. Best regards, Bartosz. On Tue, Nov 6, 2018 at 9:05 AM Matei Zaharia wrote: > Hi Bartosz, > > This is because the vote on 2.4 has passed (you can see the vote thread on > the dev mailing list) and we are just working to get the release into > various channels (Maven, PyPI, etc), which can take some time. Expect to > see an announcement soon once that’s done. > > Matei > > > On Nov 4, 2018, at 7:14 AM, Bartosz Konieczny > wrote: > > > > Hi, > > > > Today I wanted to set up a development environment for GraphX and when I > visited Maven central repository ( > https://mvnrepository.com/artifact/org.apache.spark/spark-graphx) I saw > that it was already available in 2.4.0 version. Does it mean that the new > version of Apache Spark was released ? It seems quite surprising for me > because I didn't find any release information and the 2.4 artifact was > deployed 29/10/2018. Maybe somebody here has some explanation for that ? > > > > Best regards, > > Bartosz Konieczny. > >
Re: Spark 2.4.0 artifact in Maven repository
Hi Bartosz, This is because the vote on 2.4 has passed (you can see the vote thread on the dev mailing list) and we are just working to get the release into various channels (Maven, PyPI, etc), which can take some time. Expect to see an announcement soon once that’s done. Matei > On Nov 4, 2018, at 7:14 AM, Bartosz Konieczny wrote: > > Hi, > > Today I wanted to set up a development environment for GraphX and when I > visited Maven central repository > (https://mvnrepository.com/artifact/org.apache.spark/spark-graphx) I saw that > it was already available in 2.4.0 version. Does it mean that the new version > of Apache Spark was released ? It seems quite surprising for me because I > didn't find any release information and the 2.4 artifact was deployed > 29/10/2018. Maybe somebody here has some explanation for that ? > > Best regards, > Bartosz Konieczny. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org