Starting a new Spark codebase, Python or Scala / Java?
Hello all, I will be starting a new Spark codebase and I would like to get opinions on using Python over Scala. Historically, the Scala API has always been the strongest interface to Spark. Is this still true? Are there still many benefits and additional features in the Scala API that are not available in the Python API? Are there any performance concerns using the Python API that do not exist when using the Scala API? Anything else I should know about? I appreciate any insight you have on using the Scala API over the Python API. Brandon
Re: Using spark package XGBoost
The XGBoost integration with Spark is currently only supported for RDDs, there is a ticket for dataframe and folks calm to be working on it. On Aug 14, 2016 8:15 PM, "Jacek Laskowski"wrote: > Hi, > > I've never worked with the library and speaking about sbt setup only. > > It appears that the project didn't release 2.11-compatible jars (only > 2.10) [1] so you need to build the project yourself and uber-jar it > (using sbt-assembly plugin). > > [1] https://spark-packages.org/package/rotationsymmetry/sparkxgboost > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Aug 14, 2016 at 7:13 AM, janardhan shetty > wrote: > > Any leads how to do acheive this? > > > > On Aug 12, 2016 6:33 PM, "janardhan shetty" > wrote: > >> > >> I tried using sparkxgboost package in build.sbt file but it failed. > >> Spark 2.0 > >> Scala 2.11.8 > >> > >> Error: > >> [warn] > >> http://dl.bintray.com/spark-packages/maven/ > rotationsymmetry/sparkxgboost/0.2.1-s_2.10/sparkxgboost-0.2. > 1-s_2.10-javadoc.jar > >>[warn] :: > >>[warn] :: FAILED DOWNLOADS:: > >>[warn] :: ^ see resolution messages for details ^ :: > >>[warn] :: > >>[warn] :: > >> rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(src) > >>[warn] :: > >> rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(doc) > >> > >> build.sbt: > >> > >> scalaVersion := "2.11.8" > >> > >> libraryDependencies ++= { > >> val sparkVersion = "2.0.0-preview" > >> Seq( > >> "org.apache.spark" %% "spark-core" % sparkVersion % "provided", > >> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", > >> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", > >> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided" > >> ) > >> } > >> > >> resolvers += "Spark Packages Repo" at > >> "http://dl.bintray.com/spark-packages/maven; > >> > >> libraryDependencies += "rotationsymmetry" % "sparkxgboost" % > >> "0.2.1-s_2.10" > >> > >> assemblyMergeStrategy in assembly := { > >> case PathList("META-INF", "MANIFEST.MF") => > >> MergeStrategy.discard > >> case PathList("javax", "servlet", xs @ _*) => > >> MergeStrategy.first > >> case PathList(ps @ _*) if ps.last endsWith ".html" => > >> MergeStrategy.first > >> case "application.conf"=> > >> MergeStrategy.concat > >> case "unwanted.txt"=> > >> MergeStrategy.discard > >> > >> case x => val oldStrategy = (assemblyMergeStrategy in assembly).value > >> oldStrategy(x) > >> > >> } > >> > >> > >> > >> > >> On Fri, Aug 12, 2016 at 3:35 PM, janardhan shetty < > janardhan...@gmail.com> > >> wrote: > >>> > >>> Is there a dataframe version of XGBoost in spark-ml ?. > >>> Has anyone used sparkxgboost package ? > >> > >> > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Setting spark.sql.shuffle.partitions Dynamically
Hello, My platform runs hundreds of Spark jobs every day each with its own datasize from 20mb to 20TB. This means that we need to set resources dynamically. One major pain point for doing this is spark.sql.shuffle.partitions, the number of partitions to use when shuffling data for joins or aggregations. It is to be arbitrarily hard coded to 200. The only way to set this config is in the spark submit command or in the SparkConf before the executor is created. This creates a lot of problems when I want to set this config dynamically based on the in memory size of a dataframe. I only know the in memory size of the dataframe halfway through the spark job. So I would need to stop the context and recreate it in order to set this config. Is there any better way to set this? How does spark.sql.shuffle.partitions work differently than .repartition? Brandon
Optimal Amount of Tasks Per size of data in memory
What is the best heuristic for setting the number of partitions/task on an RDD based on the size of the RDD in memory? The Spark docs say that the number of partitions/tasks should be 2-3x the number of CPU cores but this does not make sense for all data sizes. Sometimes, this number is way to much and slows down the executor because of overhead.
Size of cached dataframe
Is there any public API to get the size of a dataframe in cache? It's seen through the Spark UI but I don't see the API to access this information. Do I need to build it myself using a forked version of Spark?
Difference between Dataframe and RDD Persisting
What is the difference between persisting a dataframe and a rdd? When I persist my RDD, the UI says it takes 50G or more of memory. When I persist my dataframe, the UI says it takes 9G or less of memory. Does the dataframe not persist the actual content? Is it better / faster to persist a RDD when doing a lot of filter, mapping, and collecting operations?
Re: What does it mean when a executor has negative active tasks?
1.6 On Jun 18, 2016 10:02 AM, "Mich Talebzadeh" <mich.talebza...@gmail.com> wrote: > could be a bug as they are no failed jobs. what version of Spark is this? > > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 18 June 2016 at 17:50, Brandon White <bwwintheho...@gmail.com> wrote: > >> What does it mean when a executor has negative active tasks? >> >> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > >
Spark ML - Is it safe to schedule two trainings job at the same time or will worker state be corrupted?
For example, say I want to train two Linear Regressions and two GBD Tree Regressions. Using different threads, Spark allows you to submit jobs at the same time (see: http://spark.apache.org/docs/latest/job-scheduling.html). If I schedule two or more training jobs and they are running at the same time: 1) Is there any risk that static worker variables or worker state could become corrupted leading to incorrect calculations? 2) Is Spark ML designed for running two or more training jobs at the same time? Is this something the architects consider during implementation? Thanks, Brandon
Re: BlockManager crashing applications
I'm not quite sure how this is a memory problem. There are no OOM exceptions and the job only breaks when actions are ran in parallel, submitted to the scheduler by different threads. The issue is that the doGetRemote function does not retry when it is denied access to a cache block. On May 8, 2016 5:55 PM, "Ashish Dubey" <ashish@gmail.com> wrote: Brandon, how much memory are you giving to your executors - did you check if there were dead executors in your application logs.. Most likely you require higher memory for executors.. Ashish On Sun, May 8, 2016 at 1:01 PM, Brandon White <bwwintheho...@gmail.com> wrote: > Hello all, > > I am running a Spark application which schedules multiple Spark jobs. > Something like: > > val df = sqlContext.read.parquet("/path/to/file") > > filterExpressions.par.foreach { expression => > df.filter(expression).count() > } > > When the block manager fails to fetch a block, it throws an exception > which eventually kills the exception: http://pastebin.com/2ggwv68P > > This code works when I run it on one thread with: > > filterExpressions.foreach { expression => > df.filter(expression).count() > } > > But I really need the parallel execution of the jobs. Is there anyway > around this? It seems like a bug in the BlockManagers doGetRemote function. > I have tried the HTTP Block Manager as well. >
BlockManager crashing applications
Hello all, I am running a Spark application which schedules multiple Spark jobs. Something like: val df = sqlContext.read.parquet("/path/to/file") filterExpressions.par.foreach { expression => df.filter(expression).count() } When the block manager fails to fetch a block, it throws an exception which eventually kills the exception: http://pastebin.com/2ggwv68P This code works when I run it on one thread with: filterExpressions.foreach { expression => df.filter(expression).count() } But I really need the parallel execution of the jobs. Is there anyway around this? It seems like a bug in the BlockManagers doGetRemote function. I have tried the HTTP Block Manager as well.
QueryExecution to String breaks with OOM
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) I have a dataframe which I am running a ton of filters on. When I try to save it, my job runs out of memory. Any idea how can I fix this?
Is DataFrame randomSplit Deterministic?
If I have the same data, the same ratios, and same sample seed, will I get the same splits every time?
Re: Dataframe saves for a large set but throws OOM for a small dataset
randomSplit instead of randomSample On Apr 30, 2016 1:51 PM, "Brandon White" <bwwintheho...@gmail.com> wrote: > val df = globalDf > val filteredDfs= filterExpressions.map { expr => > val filteredDf = df.filter(expr) > val samples = filteredDf.randomSample([.7, .3]) >(samples(0), samples(1) > } > > val largeDfs = filteredDfs.(_._1) > val smallDfs = filteredDfs(_._2) > > val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) > val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) > > unionedLargeDfs.write.parquet(output) // works fine > unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in > first thread > > There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g > memory. > On Apr 30, 2016 1:22 PM, "Ted Yu" <yuzhih...@gmail.com> wrote: > >> Can you provide a bit more information: >> >> Does the smaller dataset have skew ? >> >> Which release of Spark are you using ? >> >> How much memory did you specify ? >> >> Thanks >> >> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White <bwwintheho...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I am writing to datasets. One dataset is x2 larger than the other. Both >>> datasets are written to parquet the exact same way using >>> >>> df.write.mode("Overwrite").parquet(outputFolder) >>> >>> The smaller dataset OOMs while the larger dataset writes perfectly fine. >>> Here is the stack trace: Any ideas what is going on here and how I can fix >>> it? >>> >>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:2367) >>> at >>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) >>> at >>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) >>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) >>> at java.lang.StringBuilder.append(StringBuilder.java:132) >>> at scala.StringContext.standardInterpolator(StringContext.scala:123) >>> at scala.StringContext.s(StringContext.scala:90) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) >>> at >>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) >>> at >>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) >>> at >>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) >>> at >>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) >>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) >>> at >>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >>> >> >>
Dataframe saves for a large set but throws OOM for a small dataset
Hello, I am writing to datasets. One dataset is x2 larger than the other. Both datasets are written to parquet the exact same way using df.write.mode("Overwrite").parquet(outputFolder) The smaller dataset OOMs while the larger dataset writes perfectly fine. Here is the stack trace: Any ideas what is going on here and how I can fix it? Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
DataFrame to DataSet without Predefined Class
I am reading parquet files into a dataframe. The schema varies depending on the data so I have no way to write a predefined class. Is there any way to go from DataFrame to DataSet without predefined a case class? Can I build a class from my dataframe schema?
How can I bucketize / group a DataFrame from parquet files?
I am creating a dataFrame from parquet files. The schema is based on the parquet files, I do not know it before hand. What I want to do is group the entire DF into buckets based on a column. val df = sqlContext.read.parquet("/path/to/files") val groupedBuckets: DataFrame[String, Array[Rows]] = df.groupBy($"columnName") I know this does not work because the DataFrame's groupBy is only used for aggregate functions. I cannot convert my DataFrame to a DataSet because I do not have a case class for the DataSet schema. The only thing I can do is convert the df to an RDD[Rows] and try to deal with the types. This is ugly and difficult. Is there any better way? Can I convert a DataFrame to a DataSet without a predefined case class? Brandon
Re: subscribe
https://www.youtube.com/watch?v=umDr0mPuyQc On Sat, Aug 22, 2015 at 8:01 AM, Ted Yu yuzhih...@gmail.com wrote: See http://spark.apache.org/community.html Cheers On Sat, Aug 22, 2015 at 2:51 AM, Lars Hermes li...@hermes-it-consulting.de wrote: subscribe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to save a string to a text file ?
Convert it to a rdd then save the rdd to a file val str = dank memes sc.parallelize(List(str)).saveAsTextFile(str.txt) On Fri, Aug 14, 2015 at 7:50 PM, go canal goca...@yahoo.com.invalid wrote: Hello again, online resources have sample code for writing RDD to a file, but I have a simple string, how to save to a text file ? (my data is a DenseMatrix actually) appreciate any help ! thanks, canal
Re: subscribe
https://www.youtube.com/watch?v=H07zYvkNYL8 On Mon, Aug 10, 2015 at 10:55 AM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at the first section of https://spark.apache.org/community Cheers On Mon, Aug 10, 2015 at 10:54 AM, Phil Kallos phil.kal...@gmail.com wrote: please
Spark SQL Hive - merge small files
Hello, I would love to have hive merge the small files in my managed hive context after every query. Right now, I am setting the hive configuration in my Spark Job configuration but hive is not managing the files. Do I need to set the hive fields in around place? How do you set Hive configurations in Spark? Here is what I'd like to set hive.merge.mapfilestrue hive.merge.mapredfilestrue hive.merge.size.per.task25600 hive.merge.smallfiles.avgsize1600
Re: Spark SQL Hive - merge small files
So there is no good way to merge spark files in a manage hive table right now? On Wed, Aug 5, 2015 at 10:02 AM, Michael Armbrust mich...@databricks.com wrote: This feature isn't currently supported. On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com wrote: Hello, I would love to have hive merge the small files in my managed hive context after every query. Right now, I am setting the hive configuration in my Spark Job configuration but hive is not managing the files. Do I need to set the hive fields in around place? How do you set Hive configurations in Spark? Here is what I'd like to set hive.merge.mapfilestrue hive.merge.mapredfilestrue hive.merge.size.per.task25600 hive.merge.smallfiles.avgsize1600
Re: Schema evolution in tables
Sim did you find anything? :) On Sun, Jul 26, 2015 at 9:31 AM, sim s...@swoop.com wrote: The schema merging http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging section of the Spark SQL documentation shows an example of schema evolution in a partitioned table. Is this functionality only available when creating a Spark SQL table? dataFrameWithEvolvedSchema.saveAsTable(my_table, SaveMode.Append) fails with java.lang.RuntimeException: Relation[ ... ] org.apache.spark.sql.parquet.ParquetRelation2@83a73a05 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE statement generates the same number of columns as its schema. What is the Spark SQL idiom for appending data to a table while managing schema evolution? -- View this message in context: Schema evolution in tables http://apache-spark-user-list.1001560.n3.nabble.com/Schema-evolution-in-tables-tp23999.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Turn Off Compression for Textfiles
How do you turn off gz compression for saving as textfiles? Right now, I am reading ,gz files and it is saving them as .gz. I would love to not compress them when I save. 1) DStream.saveAsTextFiles() //no compression 2) RDD.saveAsTextFile() //no compression Any ideas?
Combining Spark Files with saveAsTextFile
What is the best way to make saveAsTextFile save as only a single file?
What happens when you create more DStreams then nodes in the cluster?
Since one input dstream creates one receiver and one receiver uses one executor / node. What happens if you create more Dstreams than nodes in the cluster? Say I have 30 Dstreams on a 15 node cluster. Will ~10 streams get assigned to ~10 executors / nodes then the other ~20 streams will be queued for resources or will the other streams just fail and never run?
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Tathagata, Could the bottleneck possibility be the number of executor nodes in our cluster? Since we are creating 500 Dstreams based off 500 textfile directories, do we need at least 500 executors / nodes to be receivers for each one of the streams? On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote: @Ashwin: You could append the topic in the data. val kafkaStreams = topics.map { topic = KafkaUtils.createDirectStream(topic...).map { x = (x, topic) } } val unionedStream = context.union(kafkaStreams) @Brandon: I dont recommend it, but you could do something crazy like use the foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD waits for all the jobs to complete. manyDStreams.foreach { dstream = dstream1.foreachRDD { rdd = // Add runnable that runs the job on RDD to threadpool // This does not wait for the job to finish } } anyOfTheManyDStreams.foreachRDD { _ = // wait for all the current batch's jobs in the threadpool to complete. } This would run all the Spark jobs in the batch in parallel in thread pool, but it would also make sure all the jobs finish before the batch is marked as completed. On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com wrote: Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Re: unsubscribe
https://www.youtube.com/watch?v=JncgoPKklVE On Thu, Jul 30, 2015 at 1:30 PM, ziqiu...@accenture.com wrote: -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Does Spark Streaming need to list all the files in a directory?
Is this a known bottle neck for Spark Streaming textFileStream? Does it need to list all the current files in a directory before he gets the new files? Say I have 500k files in a directory, does it list them all in order to get the new files?
Re: unsubscribe
NO! On Tue, Jul 28, 2015 at 5:03 PM, Harshvardhan Chauhan ha...@gumgum.com wrote: -- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Has anybody ever tried running Spark Streaming on 500 text streams?
val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
Re: Programmatically launch several hundred Spark Streams in parallel
THanks. Sorry the last section was supposed be streams.par.foreach { nameAndStream = nameAndStream._2.foreachRDD { rdd = df = sqlContext.jsonRDD(rdd) df.insertInto(stream._1) } } ssc.start() On Fri, Jul 24, 2015 at 10:39 AM, Dean Wampler deanwamp...@gmail.com wrote: You don't need the par (parallel) versions of the Scala collections, actually, Recall that you are building a pipeline in the driver, but it doesn't start running cluster tasks until ssc.start() is called, at which point Spark will figure out the task parallelism. In fact, you might as well do the foreachRDD call within the initial map. No need for the streams collection, unless you need it for something else. Test it out to make sure I'm not wrong ;) However, I'm a little confused by the per-stream logic. It looks like you're using foreachRDD to dump each input stream into the same output location stream._1. True? If it's a directory, you'll get an error that it already exists for the *second* stream in streams. If you're just funneling all 500 inputs into the same output location, how about using DStream.union to combine all the input streams into one, then have one foreachRDD to write output? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Jul 24, 2015 at 11:23 AM, Brandon White bwwintheho...@gmail.com wrote: Hello, So I have about 500 Spark Streams and I want to know the fastest and most reliable way to process each of them. Right now, I am creating and process them in a list: val ssc = new StreamingContext(sc, Minutes(10)) val streams = paths.par.map { nameAndPath = (path._1, ssc.textFileStream(path._1)) } streams.par.foreach { nameAndStream = streamTuple.foreachRDD { rdd = df = sqlContext.jsonRDD(rdd) df.insertInto(stream._1) } } ssc.start() Is this the best way to do this? Are there any better faster methods?
Spark SQL Table Caching
A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase.
DataFrame Union not passing optimizer assertion
Hello! So I am doing a union of two dataframes with the same schema but a different number of rows. However, I am unable to pass an assertion. I think it is this one here https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala but I am not sure. Any ideas why this assertion isn't passing? java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.catalyst.optimizer.UnionPushdown$.buildRewrites(Optimizer.scala:72) at org.apache.spark.sql.catalyst.optimizer.UnionPushdown$$anonfun$apply$1.applyOrElse(Optimizer.scala:102) at org.apache.spark.sql.catalyst.optimizer.UnionPushdown$$anonfun$apply$1.applyOrElse(Optimizer.scala:92) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178) at org.apache.spark.sql.catalyst.optimizer.UnionPushdown$.apply(Optimizer.scala:92) at org.apache.spark.sql.catalyst.optimizer.UnionPushdown$.apply(Optimizer.scala:66) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:1087) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:1087) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1092) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1090) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1096) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1096) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
Nullpointer when saving as table with a timestamp column type
So I have a very simple dataframe that looks like df: [name:String, Place:String, time: time:timestamp] I build this java.sql.Timestamp from a string and it works really well expect when I call saveAsTable(tableName) on this df. Without the timestamp, it saves fine but with the timestamp, it throws java.lang.NullPointerException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1230) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1219) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1218) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1380) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any ideas how I can get around this?
Running foreach on a list of rdds in parallel
Hello, I have a list of rdds List(rdd1, rdd2, rdd3,rdd4) I would like to save these rdds in parallel. Right now, it is running each operation sequentially. I tried using a rdd of rdd but that does not work. list.foreach { rdd = rdd.saveAsTextFile(/tmp/cache/) } Any ideas?
How do you access a cached Spark SQL Table from a JBDC connection?
Hello there, I have a JBDC connection setup to my Spark cluster but I cannot see the tables that I cache in memory. The only tables I can see are those that are in my Hive instance. I use a HiveContext to register a table and cache it in memory. How can I enable my JBDC connection to query this in memory table? Brandon
Re: Spark Streaming - Inserting into Tables
Hi Yin, Yes there were no new rows. I fixed it by doing a .remember on the context. Obviously, this is not ideal. On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai yh...@databricks.com wrote: Hi Brandon, Can you explain what did you mean by It simply does not work? You did not see new data files? Thanks, Yin On Fri, Jul 10, 2015 at 11:55 AM, Brandon White bwwintheho...@gmail.com wrote: Why does this not work? Is insert into broken in 1.3.1? It does not throw any errors, fail, or throw exceptions. It simply does not work. val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore.saveAsParquetFile(/tmp/cache/dayBefore.parquet) val parquetFile = sqlContext.parquetFile(/tmp/cache/dayBefore.parquet) parquetFile.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(s3://textFileDirectory) val day = sqlContext.jsonFile(s3://textFileDirectory) day.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.registerTempTable(tmp_rideaccepted) sqlContext.sql(insert into table rideaccepted select * from tmp_rideaccepted) } ssc.start() or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore..registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start()
Spark Streaming - Inserting into Tables
Why does this not work? Is insert into broken in 1.3.1? It does not throw any errors, fail, or throw exceptions. It simply does not work. val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore.saveAsParquetFile(/tmp/cache/dayBefore.parquet) val parquetFile = sqlContext.parquetFile(/tmp/cache/dayBefore.parquet) parquetFile.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(s3://textFileDirectory) val day = sqlContext.jsonFile(s3://textFileDirectory) day.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.registerTempTable(tmp_rideaccepted) sqlContext.sql(insert into table rideaccepted select * from tmp_rideaccepted) } ssc.start() or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore..registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start()
What is faster for SQL table storage, On-Heap or off-heap?
Is the read / aggregate performance better when caching Spark SQL tables on-heap with sqlContext.cacheTable() or off heap by saving it to Tachyon? Has anybody tested this? Any theories?
S3 vs HDFS
Are there any significant performance differences between reading text files from S3 and hdfs?
Re: Parallelizing multiple RDD / DataFrame creation in Spark
The point of running them in parallel would be faster creation of the tables. Has anybody been able to efficiently parallelize something like this in Spark? On Jul 8, 2015 12:29 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Whats the point of creating them in parallel? You can multi-thread it run it in parallel though. Thanks Best Regards On Wed, Jul 8, 2015 at 5:34 AM, Brandon White bwwintheho...@gmail.com wrote: Say I have a spark job that looks like following: def loadTable1() { val table1 = sqlContext.jsonFile(ss3://textfiledirectory/) table1.cache().registerTempTable(table1)} def loadTable2() { val table2 = sqlContext.jsonFile(ss3://testfiledirectory2/) table2.cache().registerTempTable(table2)} def loadAllTables() { loadTable1() loadTable2()} loadAllTables() How do I parallelize this Spark job so that both tables are created at the same time or in parallel?
Re: Real-time data visualization with Zeppelin
Can you use a con job to update it every X minutes? On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all – I’m just wondering if anyone has had success integrating Spark Streaming with Zeppelin and actually dynamically updating the data in near real-time. From my investigation, it seems that Zeppelin will only allow you to display a snapshot of data, not a continuously updating table. Has anyone figured out if there’s a way to loop a display command or how to provide a mechanism to continuously update visualizations? Thank you, Ilya Ganelin -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Spark query
Convert the column to a column of java Timestamps. Then you can do the following import java.sql.Timestamp import java.util.Calendar def date_trunc(timestamp:Timestamp, timeField:String) = { timeField match { case hour = val cal = Calendar.getInstance() cal.setTimeInMillis(timestamp.getTime()) cal.get(Calendar.HOUR_OF_DAY) case day = val cal = Calendar.getInstance() cal.setTimeInMillis(timestamp.getTime()) cal.get(Calendar.DAY) } } sqlContext.udf.register(date_trunc, date_trunc _) On Wed, Jul 8, 2015 at 9:23 PM, Harish Butani rhbutani.sp...@gmail.com wrote: try the spark-datetime package: https://github.com/SparklineData/spark-datetime Follow this example https://github.com/SparklineData/spark-datetime#a-basic-example to get the different attributes of a DateTime. On Wed, Jul 8, 2015 at 9:11 PM, prosp4300 prosp4...@163.com wrote: As mentioned in Spark sQL programming guide, Spark SQL support Hive UDFs, please take a look below builtin UDFs of Hive, get day of year should be as simply as existing RDBMS https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions At 2015-07-09 12:02:44, Ravisankar Mani rrav...@gmail.com wrote: Hi everyone, I can't get 'day of year' when using spark query. Can you help any way to achieve day of year? Regards, Ravi
Parallelizing multiple RDD / DataFrame creation in Spark
Say I have a spark job that looks like following: def loadTable1() { val table1 = sqlContext.jsonFile(ss3://textfiledirectory/) table1.cache().registerTempTable(table1)} def loadTable2() { val table2 = sqlContext.jsonFile(ss3://testfiledirectory2/) table2.cache().registerTempTable(table2)} def loadAllTables() { loadTable1() loadTable2()} loadAllTables() How do I parallelize this Spark job so that both tables are created at the same time or in parallel?
Why can I not insert into TempTables in Spark SQL?
Why does this not work? Is insert into broken in 1.3.1? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore.saveAsParquetFile(/tmp/cache/dayBefore.parquet) val parquetFile = sqlContext.parquetFile(/tmp/cache/dayBefore.parquet) parquetFile.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore..registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start()
Grouping elements in a RDD
How would you do a .grouped(10) on a RDD, is it possible? Here is an example for a Scala list scala List(1,2,3,4).grouped(2).toList res1: List[List[Int]] = List(List(1, 2), List(3, 4)) Would like to group n elements.