Local spark jars not being detected
Hi, I'm using IntelliJ ide for my spark project. I've compiled spark 1.3.0 for scala 2.11.4 and here's the one of the compiled jar installed in my m2 folder : ~/.m2/repository/org/apache/spark/spark-core_2.11/1.3.0/spark-core_2.11-1.3.0.jar But when I add this dependency in my pom file for the project : dependency groupIdorg.apache.spark/groupId artifactIdspark-core_$(scala.version)/artifactId version${spark.version}/version scopeprovided/scope /dependency I'm getting Dependency org.apache.spark:spark-core_$(scala.version):1.3.0 not found. Why is this happening and what's the workaround ?
Re: Abount Jobs UI in yarn-client mode
On 19 Jun 2015, at 16:48, Sea 261810...@qq.commailto:261810...@qq.com wrote: Hi, all: I run spark on yarn, I want to see the Jobs UI http://ip:4040/, but it redirect to http://${yarn.ip}/proxy/application_1428110196022_924324/ which can not be found. Why? Anyone can help? whenever you point your browser directly at the web UI of a YARn app, it redirects you the YARN resource manager, which acts as a proxy (the way you see it as you go through the YARN resource manager web) it looks like here the configuration yarn-site.xml used when you launched the application isn't right, the URL to the rmproxy is being configured from another property there, yarn.ip, which isn't defined. have a look at the yarn.resourcemanager.address value property nameyarn.resourcemanager.address/name valuemyresourcemanager.example.orghttp://myresourcemanager.example.org:8050/value /property
Verifying number of workers in Spark Streaming
How to know that In stream Processing over the cluster of 8 machines all the machines/woker nodes are being used (my cluster have 8 slaves ) . -- Thanks Regards, Anshu Shukla
Re: Web UI vs History Server Bugs
On 17 Jun 2015, at 19:10, jcai jonathon@yale.edu wrote: Hi, I am running this on Spark stand-alone mode. I find that when I examine the web UI, a couple bugs arise: 1. There is a discrepancy between the number denoting the duration of the application when I run the history server and the number given by the web UI (default address is master:8080). I checked more specific details, including task and stage durations (when clicking on the application), and these appear to be the same for both avenues. 2. Sometimes the web UI on master:8080 is unable to display more specific information for an application that has finished (when clicking on the application), even when there is a log file in the appropriate directory. But when the history server is opened, it is able to read this file and output information. There's a JIRA open on the history server caching incomplete work...if you click on the link to a job while it's in progress, you don't get any updates later. does this sound like what you are seeing? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local spark jars not being detected
Not sure, but try removing the provided or create a lib directory in the project home and bring that jar over there. On 20 Jun 2015 18:08, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, I'm using IntelliJ ide for my spark project. I've compiled spark 1.3.0 for scala 2.11.4 and here's the one of the compiled jar installed in my m2 folder : ~/.m2/repository/org/apache/spark/spark-core_2.11/1.3.0/spark-core_2.11-1.3.0.jar But when I add this dependency in my pom file for the project : dependency groupIdorg.apache.spark/groupId artifactIdspark-core_$(scala.version)/artifactId version${spark.version}/version scopeprovided/scope /dependency I'm getting Dependency org.apache.spark:spark-core_$(scala.version):1.3.0 not found. Why is this happening and what's the workaround ?
Spark SQL JDBC Source data skew
Hi, In Spark SQL JDBC data source there is an option to specify upper/lower bound and num of partitions. How Spark handles data distribution, if we do not give the upper/lower/num of parititons ? Will all data from the external data source skewed up in one executor? In many situations, we do not know the upper/lower bound of the underlying dataset until the query is executed, so it is not possible to pass upper/lower bound values. Thanks Sathish
Re: Velox Model Server
Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
RE: Code review - Spark SQL command-line client for Cassandra
It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed From: shahid ashraf [mailto:sha...@trialx.com] Sent: Saturday, June 20, 2015 6:52 AM To: Mohammed Guller Cc: Matthew Johnson; user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed From: Matthew Johnson [mailto:matt.john...@algomi.commailto:matt.john...@algomi.com] Sent: Friday, June 19, 2015 2:20 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) private void run(String sqlQuery) { SparkContext scc = new SparkContext(conf); CassandraSQLContext csql = new CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.currentTimeMillis(); LOG.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); LOG.info(SQL results saved); } public static void main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery = args[2]; SparkConf conf = new SparkConf(); conf.setAppName(Java Spark SQL); conf.setMaster(sparkMasterUrl); conf.set(spark.cassandra.connection.host, sparkHost); JavaSparkSQL app = new JavaSparkSQL(conf); app.run(sqlQuery, printToConsole); } I can then submit this to Spark with ‘spark-submit’: ./spark-submit --class com.algomi.spark.JavaSparkSQL --master spark://sales3:7077 spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://sales3:7077 sales3 select * from mykeyspace.operationlog It seems to work pretty well, so I’m pretty happy, but wondering why this isn’t common practice (at least I haven’t been able to find much about it on Google) – is there something terrible that I’m missing? Thanks! Matthew
Spark 1.4 History Server - HDP 2.2
Can any one help i am getting below error when i try to start the History Server I do not see any org.apache.spark.deploy.yarn.history.pakage inside the assembly jar not sure how to get that java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider Thanks, Ashish
Re: createDirectStream and Stats
Are you sure you were using all 100 executors even with the receiver model? Because in receiver mode, the number of partitions is dependent on the batch duration and block interval. It may not necessarily map directly to the number of executors in your app unless you've adjusted the block interval and batch duration. From: Tim Smithmailto:secs...@gmail.com Sent: Friday, June 19, 2015 10:36 PM To: user@spark.apache.orgmailto:user@spark.apache.org I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem
Fwd: Verifying number of workers in Spark Streaming
Any suggestions please ..!! How to know that In stream Processing over the cluster of 8 machines all the machines/woker nodes are being used (my cluster have 8 slaves ) . I am submitting job from master itself over the ec-2 cluster crated by the ec-2 scripts available with spark. But i am not able figure out that my job is using all workers or not . -- Thanks Regards, Anshu Shukla SERC-IISC
Re: Local spark jars not being detected
Yes, finally solved. It was there in front of my eyes all time. Thanks a lot Pete.
Re: Velox Model Server
Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
How to get the ALS reconstruction error
Hello; I am fitting ALS models and would like to get an initial idea of the number of factors.I wan tot use the reconstruction error on train data as a measure. Does the API expose the reconstruction error ? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-ALS-reconstruction-error-tp23416.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Code review - Spark SQL command-line client for Cassandra
Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com] *Sent:* Friday, June 19, 2015 2:20 AM *To:* user@spark.apache.org *Subject:* Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) *private* *void* run(String sqlQuery) { SparkContext scc = *new* SparkContext(conf); CassandraSQLContext csql = *new* CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.*currentTimeMillis*(); *LOG*.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); *LOG*.info(SQL results saved); } *public* *static* *void* main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery = args[2]; SparkConf conf = *new* SparkConf(); conf.setAppName(Java Spark SQL); conf.setMaster(sparkMasterUrl); conf.set(spark.cassandra.connection.host, sparkHost); JavaSparkSQL app = *new* JavaSparkSQL(conf); app.run(sqlQuery, printToConsole); } I can then submit this to Spark with ‘spark-submit’: Ø *./spark-submit --class com.algomi.spark.JavaSparkSQL --master spark://sales3:7077 spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://sales3:7077 sales3 select * from mykeyspace.operationlog * It seems to work pretty well, so I’m pretty happy, but wondering why this isn’t common practice (at least I haven’t been able to find much about it on Google) – is there something terrible that I’m missing? Thanks! Matthew
RE: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi Roberto, I'm not an EMR person, but it looks like option -h is deploying the necessary dataneucleus JARs for you.The req for HiveContext is the hive-site.xml and dataneucleus JARs. As long as these 2 are there, and Spark is compiled with -Phive, it should work. spark-shell runs in yarn-client mode. Not sure whether your other application is running under the same mode or a different one. Try specifying yarn-client mode and see if you get the same result as spark-shell. From: roberto.coluc...@gmail.com Date: Wed, 10 Jun 2015 14:32:04 +0200 Subject: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient To: user@spark.apache.org Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action install-spark (https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version -v1.3.1e so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the pom.xml I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - val sparkConf = new SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, config.batchDuration) import org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._sqlContext.sql(an-HiveQL-query) ssc.start()ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at myDriver.scala: line of the sqlContext.sql(query) Caused by some stuff Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory ... Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory Thinking about a wrong Hive installation/configuration or libs/classpath definition, I SSHed into the cluster and launched a spark-shell. Excluding the app configuration and StreamingContext usage/definition, I then carried out all the actions listed in the driver implementation, in particular all the Hive-related ones and they all went through smoothly! I also tried to use the optional -h argument (https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md#arguments-optional) in the
Re: Local spark jars not being detected
It looks like you are using parens instead of curly braces on scala.version On Jun 20, 2015, at 8:38 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, I'm using IntelliJ ide for my spark project. I've compiled spark 1.3.0 for scala 2.11.4 and here's the one of the compiled jar installed in my m2 folder : ~/.m2/repository/org/apache/spark/spark-core_2.11/1.3.0/spark-core_2.11-1.3.0.jar But when I add this dependency in my pom file for the project : dependency groupIdorg.apache.spark/groupId artifactIdspark-core_$(scala.version)/artifactId version${spark.version}/version scopeprovided/scope /dependency I'm getting Dependency org.apache.spark:spark-core_$(scala.version):1.3.0 not found. Why is this happening and what's the workaround ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Velox Model Server
Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb
Re: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
We worked it out. There was multiple items (like location of remote metastore and db user auth) to make HiveContext happy in yarn-cluster mode. For reference https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/using-hivecontext-yarn-cluster.md -Christopher Bozeman On Jun 20, 2015, at 7:24 AM, Andrew Lee alee...@hotmail.commailto:alee...@hotmail.com wrote: Hi Roberto, I'm not an EMR person, but it looks like option -h is deploying the necessary dataneucleus JARs for you. The req for HiveContext is the hive-site.xml and dataneucleus JARs. As long as these 2 are there, and Spark is compiled with -Phive, it should work. spark-shell runs in yarn-client mode. Not sure whether your other application is running under the same mode or a different one. Try specifying yarn-client mode and see if you get the same result as spark-shell. From: roberto.coluc...@gmail.commailto:roberto.coluc...@gmail.com Date: Wed, 10 Jun 2015 14:32:04 +0200 Subject: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient To: user@spark.apache.orgmailto:user@spark.apache.org Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action install-spark (https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version -v1.3.1e so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the pom.xml I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - val sparkConf = new SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, config.batchDuration) import org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ sqlContext.sql(an-HiveQL-query) ssc.start() ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at myDriver.scala: line of the sqlContext.sql(query) Caused by some stuff Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory ... Caused by: java.lang.ClassNotFoundException:
Re: Velox Model Server
Mind if I ask what 1.3/1.4 ML features that you are looking for? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com javascript:_e(%7B%7D,'cvml','sandy.r...@cloudera.com'); wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com javascript:_e(%7B%7D,'cvml','sandy.r...@cloudera.com'); wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com javascript:_e(%7B%7D,'cvml','charles.ce...@gmail.com'); wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com javascript:_e(%7B%7D,'cvml','debasish.da...@gmail.com'); wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles -- Donald Szeto PredictionIO
Re: Velox Model Server
Integration of model server with ML pipeline API. On Sat, Jun 20, 2015 at 12:25 PM, Donald Szeto don...@prediction.io wrote: Mind if I ask what 1.3/1.4 ML features that you are looking for? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles -- Donald Szeto PredictionIO
Re: Abount Jobs UI in yarn-client mode
I got the same problem when I upgrade from 1.3.1 to 1.4. The same Conf has been used, 1.3 works, but 1.4UI does not work. So I added the property nameyarn.resourcemanager.webapp.address/name value:8088/value /property property nameyarn.resourcemanager.hostname/name value/value /property To yarn-site.xml. The problem solved. Spark 1.4 + Yarn 2.7 + Java 8 On Fri, Jun 19, 2015 at 8:48 AM, Sea 261810...@qq.com wrote: Hi, all: I run spark on yarn, I want to see the Jobs UI http://ip:4040/, but it redirect to http:// ${yarn.ip}/proxy/application_1428110196022_924324/ which can not be found. Why? Anyone can help?
Re: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
I confirm, Christopher was very kind helping me out here. The solution presented in the linked doc worked perfectly. IMO it should be linked in the official Spark documentation. Thanks again, Roberto On 20 Jun 2015, at 19:25, Bozeman, Christopher bozem...@amazon.com wrote: We worked it out. There was multiple items (like location of remote metastore and db user auth) to make HiveContext happy in yarn-cluster mode. For reference https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/using-hivecontext-yarn-cluster.md -Christopher Bozeman On Jun 20, 2015, at 7:24 AM, Andrew Lee alee...@hotmail.com wrote: Hi Roberto, I'm not an EMR person, but it looks like option -h is deploying the necessary dataneucleus JARs for you. The req for HiveContext is the hive-site.xml and dataneucleus JARs. As long as these 2 are there, and Spark is compiled with -Phive, it should work. spark-shell runs in yarn-client mode. Not sure whether your other application is running under the same mode or a different one. Try specifying yarn-client mode and see if you get the same result as spark-shell. From: roberto.coluc...@gmail.com Date: Wed, 10 Jun 2015 14:32:04 +0200 Subject: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient To: user@spark.apache.org Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action install-spark (https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version -v1.3.1e so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the pom.xml I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - val sparkConf = new SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, config.batchDuration) import org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ sqlContext.sql(an-HiveQL-query) ssc.start() ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at
Re: Velox Model Server
Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
Re: Velox Model Server
After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
Re: Submitting Spark Applications using Spark Submit
Hey Andrew, I tried the following approach: I modified my Spark build on my local machine. I did downloaded the Spark 1.4.0 src code and then made a change to ResultTask.scala( I made a simple change to see if it work. I added a print statement). Now, I built spark using mvn -Dhadoop.version=1.0.4 -Phadoop-1 -DskipTests -Dscala-2.10 clean package Now, the new assembly jar was built. I started my EC2 Cluster using this command: ./ec2/spark-ec2 -k key -i ../aggr/key.pem --instance-type=m3.medium --zone=us-east-1b -s 9 launch spark-cluster I initially launched my application jar and it worked fine. After that I scp’d the new assembly jar to the spark lib directory of all my ec2 nodes. When I ran the jar again I got the following error: 5/06/21 00:42:51 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:42:52 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:42:52 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:11 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:43:11 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:11 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:43:31 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:43:31 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:31 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:43:51 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/06/21 00:43:51 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/06/21 00:43:51 INFO SparkUI: Stopped Spark web UI at http://XXX.compute-1.amazonaws.com:4040 15/06/21 00:43:51 INFO DAGScheduler: Stopping DAGScheduler 15/06/21 00:43:51 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/06/21 00:43:51 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/06/21 00:43:51 ERROR OneForOneStrategy: java.lang.NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at
Re: Grouping elements in a RDD
If you use rdd.mapPartitions(), you'll be able to get a hold of the iterators for each partiton. Then you should be able to do iterator.grouped(size) on each of the partitions. I think it may mean you have 1 element at the end of each partition that may have less than size elements. If that's okay for you then that should work. On Sat, Jun 20, 2015 at 7:48 PM, Brandon White bwwintheho...@gmail.com wrote: How would you do a .grouped(10) on a RDD, is it possible? Here is an example for a Scala list scala List(1,2,3,4).grouped(2).toList res1: List[List[Int]] = List(List(1, 2), List(3, 4)) Would like to group n elements.
Re: Serial batching with Spark Streaming
No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 19 June 2015 at 16:58, Tathagata Das t...@databricks.com wrote: I see what is the problem. You are adding sleep in the transform operation. The transform function is called at the time of preparing the Spark jobs for a batch. It should not be running any time consuming operation like a RDD action or a sleep. Since this operation needs to run every batch interval, doing blocking long running operation messes with the need to run every batch interval. I will try to make this clearer in the guide. I had not seen anyone do something like this before and therefore it did not occur to me that this could happen. As long as you dont do time consuming blocking operation in the transform function, the batches will be generated, scheduled and executed in serial order by default. On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia mici...@gmail.com wrote: Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again! On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote: I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) = { val jobFunc = () = { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None = None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) = Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote: Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521
Load slf4j from the job assembly instead of from the Spark jar
Hi everyone, I'm trying to use the logstash-logback-encoder https://github.com/logstash/logstash-logback-encoder in my spark jobs but I'm having some problems with the Spark classloader. The logstash-logback-encoder uses a special version of the slf4j BasicMarker https://github.com/qos-ch/slf4j/blob/master/slf4j-api/src/main/java/org/slf4j/helpers/BasicMarker.java, called LogstashBasicMarker https://github.com/logstash/logstash-logback-encoder/blob/master/src/main/java/org/slf4j/helpers/LogstashBasicMarker.java, that exposes the slf4j marker's contructor, that has visibility package, as public. This works well inside a Java application but not inside a Spark job: when this markers are created inside a Spark job, a security exception is thrown because Spark loads its own version of slf4j instead of the one from the assembly of the job. One solution could be to set spark.driver.userClassPathFirst=true but this option is experimental and affects all the libraries loaded. I would like to force Spark to load only the slf4j library from the assembly of my job. Is this possible and, if it is, is this safe? Another thing: does anybody knows why this happens? I get that the problem is because of from where slf4j is loaded but I'm not sure why the Marker constructor is not visible in a Spark job. You can find details about the problem at https://github.com/logstash/logstash-logback-encoder/issues/104. Thanks, Mario
Re: Serial batching with Spark Streaming
Thank you very much for confirmation. On 20 June 2015 at 17:21, Tathagata Das t...@databricks.com wrote: No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 19 June 2015 at 16:58, Tathagata Das t...@databricks.com wrote: I see what is the problem. You are adding sleep in the transform operation. The transform function is called at the time of preparing the Spark jobs for a batch. It should not be running any time consuming operation like a RDD action or a sleep. Since this operation needs to run every batch interval, doing blocking long running operation messes with the need to run every batch interval. I will try to make this clearer in the guide. I had not seen anyone do something like this before and therefore it did not occur to me that this could happen. As long as you dont do time consuming blocking operation in the transform function, the batches will be generated, scheduled and executed in serial order by default. On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia mici...@gmail.com wrote: Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again! On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote: I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) = { val jobFunc = () = { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None = None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) = Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote: Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_...
Grouping elements in a RDD
How would you do a .grouped(10) on a RDD, is it possible? Here is an example for a Scala list scala List(1,2,3,4).grouped(2).toList res1: List[List[Int]] = List(List(1, 2), List(3, 4)) Would like to group n elements.
How could output the StreamingLinearRegressionWithSGD prediction result?
Hey, I am testing the StreamingLinearRegressionWithSGD following the tutorial. It works, but I could not output the prediction results. I tried the saveAsTextFile, but it only output _SUCCESS to the folder. I am trying to check the prediction results and use BinaryClassificationMetrics to get areaUnderROC. Any example for this? Thank you !
Task Serialization Error on DataFrame.foreachPartition
Hi, I am loading data from Hive table to Hbase after doing some manipulation. I am getting error as 'Task not Serializable'. My code is as below. public class HiveToHbaseLoader implements Serializable { public static void main(String[] args) throws Exception { String hbaseTableName = args[0]; String hiveQuery = args[1]; SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader) .setMaster(); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); hiveContext.setConf(hive.metastore.uris, ?); DataFrame dataFrame = hiveContext.sql(hiveQuery); dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { //Logic to load row from hive to Hbase. } }} Getting error as below. Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805) at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875) at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46) Caused by: java.io.NotSerializableException: com.philips.bda.HiveToHbaseLoader$1 Serialization stack: - object not serializable (class: com.philips.bda.HiveToHbaseLoader$1, value: function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 5 more -- Regards, Nishant