RE: Error building a self contained Spark app
Ok this is what I have: object SQLHiveContextSingleton { @transient private var instance: HiveContext = _ def getInstance(sparkContext: SparkContext): HiveContext = { synchronized { if (instance == null || sparkContext.isStopped) { instance = new HiveContext(sparkContext) } instance } } } and then in my application I have: linesStream.foreachRDD(linesRdd => { if (!linesRdd.isEmpty()) { val jsonRdd = linesRdd.map(x => parser.parse(x)) val sqlContext = SQLHiveContextSingleton.getInstance(linesRdd.sparkContext) import sqlContext.implicits._ Again this is for a streaming app and I am following best practice from here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations Date: Fri, 4 Mar 2016 14:52:58 -0800 Subject: Re: Error building a self contained Spark app From: yuzhih...@gmail.com To: mich.talebza...@gmail.com CC: user@spark.apache.org Can you show your code snippet ?Here is an example: val sqlContext = new SQLContext(sc) import sqlContext.implicits._ On Fri, Mar 4, 2016 at 1:55 PM, Mich Talebzadehwrote: Hi Ted, I am getting the following error after adding that import [error] /home/hduser/dba/bin/scala/Sequence/src/main/scala/Sequence.scala:5: not found: object sqlContext [error] import sqlContext.implicits._ [error]^ [error] /home/hduser/dba/bin/scala/Sequence/src/main/scala/Sequence.scala:15: value toDF is not a member of Seq[(String, Int)] Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 4 March 2016 at 21:39, Ted Yu wrote: Can you add the following into your code ? import sqlContext.implicits._ On Fri, Mar 4, 2016 at 1:14 PM, Mich Talebzadeh wrote: Hi, I have a simple Scala program as below import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContextobject Sequence { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Sequence") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val a = Seq(("Mich",20), ("Christian", 18), ("James",13), ("Richard",16)) // Sort option 1 using tempTable val b = a.toDF("Name","score").registerTempTable("tmp") sql("select Name,score from tmp order by score desc").show // Sort option 2 with FP a.toDF("Name","score").sort(desc("score")).show } } I build this using sbt tool as below cat sequence.sbt name := "Sequence"version := "1.0"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.0" But it fails compilation as below [info] Compilation completed in 12.366 s [error] /home/hduser/dba/bin/scala/Sequence/src/main/scala/Sequence.scala:15: value toDF is not a member of Seq[(String, Int)] [error] val b = a.toDF("Name","score").registerTempTable("tmp") [error] ^ [error] /home/hduser/dba/bin/scala/Sequence/src/main/scala/Sequence.scala:16: not found: value sql [error] sql("select Name,score from tmp order by score desc").show [error] ^ [error] /home/hduser/dba/bin/scala/Sequence/src/main/scala/Sequence.scala:18: value toDF is not a member of Seq[(String, Int)] [error] a.toDF("Name","score").sort(desc("score")).show [error] ^ [error] three errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 95 s, completed Mar 4, 2016 9:06:40 PM I think I am missing some dependencies here I have a simple Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com
FW: How to get the singleton instance of SQLContext/HiveContext: val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
From: je...@hotmail.com To: yuzhih...@gmail.com Subject: RE: How to get the singleton instance of SQLContext/HiveContext: val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) Date: Fri, 4 Mar 2016 14:09:20 -0800 Below code is from the soruces, is this what you ask? class HiveContext private[hive]( 79 sc: SparkContext, 80 cacheManager: CacheManager, 81 listener: SQLListener, 82 @transient private val execHive: HiveClientImpl, 83 @transient private val metaHive: HiveClient, 84 isRootContext: Boolean) 85 extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { J Date: Fri, 4 Mar 2016 13:53:38 -0800 Subject: Re: How to get the singleton instance of SQLContext/HiveContext: val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) From: yuzhih...@gmail.com To: je...@hotmail.com CC: user@spark.apache.org bq. However the method does not seem inherited to HiveContext. Can you clarify the above observation ?HiveContext extends SQLContext . On Fri, Mar 4, 2016 at 1:23 PM, jelez <je...@hotmail.com> wrote: What is the best approach to use getOrCreate for streaming job with HiveContext. It seems for SQLContext the recommended approach is to use getOrCreate: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) However the method does not seem inherited to HiveContext. I currently create my own singleton class and use it like this: val sqlContext = SQLHiveContextSingleton.getInstance(linesRdd.sparkContext) However, i am not sure if this is reliable. What would be the best approach? Any examples? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-singleton-instance-of-SQLContext-HiveContext-val-sqlContext-SQLContext-getOrCreate-rd-tp26399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best way to merge files from streaming jobs on S3
My streaming job is creating files on S3. The problem is that those files end up very small if I just write them to S3 directly. This is why I use coalesce() to reduce the number of files and make them larger. However, coalesce shuffles data and my job processing time ends up higher than sparkBatchIntervalMilliseconds. I have observed that if I coalesce the number of partitions to be equal to the cores in the cluster I get less shuffling - but that is unsubstantiated. Is there any dependency/rule between number of executors, number of cores etc. that I can use to minimize shuffling and at the same time achieve minimum number of output files per batch? What is the best practice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-files-from-streaming-jobs-on-S3-tp26400.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get the singleton instance of SQLContext/HiveContext: val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
What is the best approach to use getOrCreate for streaming job with HiveContext. It seems for SQLContext the recommended approach is to use getOrCreate: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) However the method does not seem inherited to HiveContext. I currently create my own singleton class and use it like this: val sqlContext = SQLHiveContextSingleton.getInstance(linesRdd.sparkContext) However, i am not sure if this is reliable. What would be the best approach? Any examples? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-singleton-instance-of-SQLContext-HiveContext-val-sqlContext-SQLContext-getOrCreate-rd-tp26399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best way to merge files from streaming jobs
My streaming job is creating files on S3.The problem is that those files end up very small if I just write them to S3 directly.This is why I use coalesce() to reduce the number of files and make them larger. However, coalesce shuffles data and my job processing time ends up higher than sparkBatchIntervalMilliseconds. I have observed that if I coalesce the number of partitions to be equal to the cores in the cluster I get less shuffling - but that is unsubstantiated.Is there any dependency/rule between number of executors, number of cores etc. that I can use to minimize shuffling and at the same time achieve minimum number of output files per batch?What is the best practice?
How to get the singleton instance of SQLContext/HiveContext: val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
What is the best approach to use getOrCreate for streaming job with HiveContext.It seems for SQLContext the recommended approach is to use getOrCreate: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operationsval sqlContext = SQLContext.getOrCreate(rdd.sparkContext)However the method does not seem inherited to HiveContext.I currently create my own singleton class and use it like this:val sqlContext = SQLHiveContextSingleton.getInstance(linesRdd.sparkContext) However, i am not sure if this is reliable. What would be the best approach?Any examples?
S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append
Working on a streaming job with DirectParquetOutputCommitter to S3I need to use PartitionBy and hence SaveMode.Append Apparently when using SaveMode.Append spark automatically defaults to the default parquet output committer and ignores DirectParquetOutputCommitter. My problems are:1. the copying to _temporary takes alot of time2. I get job failures with: java.io.FileNotFoundException: File s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does not exist. I have set:sparkConfig.set("spark.speculation", "false") sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") sc.hadoopConfiguration.set("mapreduce.reduce.speculative", "false") Any ideas? Opinions? Best practices?