Hi guys, i finally understand that i cannot use sbt-pack to use programmatically the spark-streaming job as unix commands, i have to use yarn or mesos in order to run the jobs.
I have some doubts, if i run the spark streaming jogs as yarn client mode, i am receiving this exception: [cloudera@quickstart ~]$ spark-submit --class example.spark.AmazonKafkaConnectorWithMongo --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 3 /home/cloudera/awesome-recommendation-engine/target/scala-2.10/my-recommendation-spark-engine_2.10-1.0-SNAPSHOT.jar 192.168.1.35:9092 amazonRatingsTopic java.lang.ClassNotFoundException: example.spark.AmazonKafkaConnectorWithMongo at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.util.Utils$.classForName(Utils.scala:175) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) But, if i use cluster mode, i have that is job is accepted. [cloudera@quickstart ~]$ spark-submit --class example.spark.AmazonKafkaConnectorWithMongo --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 2 /home/cloudera/awesome-recommendation-engine/target/scala-2.10/my-recommendation-spark-engine_2.10-1.0-SNAPSHOT.jar 192.168.1.35:9092 amazonRatingsTopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 16/06/06 11:16:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/06/06 11:16:46 INFO client.RMProxy: Connecting to ResourceManager at / 0.0.0.0:8032 16/06/06 11:16:46 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 16/06/06 11:16:46 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 16/06/06 11:16:46 INFO yarn.Client: Will allocate AM container, with 4505 MB memory including 409 MB overhead 16/06/06 11:16:46 INFO yarn.Client: Setting up container launch context for our AM 16/06/06 11:16:46 INFO yarn.Client: Setting up the launch environment for our AM container 16/06/06 11:16:46 INFO yarn.Client: Preparing resources for our AM container 16/06/06 11:16:47 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 16/06/06 11:16:47 INFO yarn.Client: Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1465201086091_0006/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar 16/06/06 11:16:47 INFO yarn.Client: Uploading resource file:/home/cloudera/awesome-recommendation-engine/target/scala-2.10/my-recommendation-spark-engine_2.10-1.0-SNAPSHOT.jar -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1465201086091_0006/my-recommendation-spark-engine_2.10-1.0-SNAPSHOT.jar 16/06/06 11:16:47 INFO yarn.Client: Uploading resource file:/tmp/spark-8e5fe800-bed2-4173-bb11-d47b3ab3b621/__spark_conf__5840282197389631291.zip -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1465201086091_0006/__spark_conf__5840282197389631291.zip 16/06/06 11:16:47 INFO spark.SecurityManager: Changing view acls to: cloudera 16/06/06 11:16:47 INFO spark.SecurityManager: Changing modify acls to: cloudera 16/06/06 11:16:47 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 16/06/06 11:16:47 INFO yarn.Client: Submitting application 6 to ResourceManager 16/06/06 11:16:48 INFO impl.YarnClientImpl: Submitted application application_1465201086091_0006 16/06/06 11:16:49 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:49 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.cloudera start time: 1465204607993 final status: UNDEFINED tracking URL: http://quickstart.cloudera:8088/proxy/application_1465201086091_0006/ user: cloudera 16/06/06 11:16:50 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:51 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:52 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:53 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:54 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:55 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:56 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:57 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:58 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:16:59 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:00 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:01 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:02 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:03 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:04 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:05 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:06 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:07 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:08 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:09 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:10 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:11 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:12 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:13 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:14 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:15 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:16 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:17 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:18 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) 16/06/06 11:17:19 INFO yarn.Client: Application report for application_1465201086091_0006 (state: ACCEPTED) ... If i try to push a product to the kafka topic (amazonRatingsTopic), the kafka broker is living in my host machine (192.168.1.35:9092), i cannot see nothing in the logs. I can see in http://quickstart.cloudera:8888/jobbrowser/ that the job is accepted, when i click on the application_id, i can see this: The application might not be running yet or there is no Node Manager or Container available. This page will be automatically refreshed. even if i push data into the kafka topic. Another think i have noticed is that spark-worker is dead after a few minutes that the job is accepted, i have to restart it manually doing sudo service spark-worker restart. If i run jus command, i see this: [cloudera@quickstart ~]$ jps 11904 SparkSubmit 12890 Jps 7271 sbt-launch.jar [cloudera@quickstart ~]$ I know that sbt-launch is the sbt command running in another terminal, but, ¿Are NameNode processes and DataNode should not appear? Thank you very much for reading until here. Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> 2016-06-04 18:23 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Hi, > > Spark works in local, standalone and yarn-client mode. Start as master = > local. That is the simplest model.You DO not need to start > $SPAK_HOME/sbin/start-master.sh and $SPAK_HOME/sbin/start-slaves.sh > > > Also you do not need to specify all that in spark-submit. In the Scala > code you can do > > val sparkConf = new SparkConf(). > setAppName("CEP_streaming_with_JDBC"). > set("spark.driver.allowMultipleContexts", "true"). > set("spark.hadoop.validateOutputSpecs", "false") > > And specify all that in spark-submit itself with minimum resources > > ${SPARK_HOME}/bin/spark-submit \ > --packages com.databricks:spark-csv_2.11:1.3.0 \ > --driver-memory 2G \ > --num-executors 1 \ > --executor-memory 2G \ > --master local \ > --executor-cores 2 \ > --conf > "spark.executor.extraJavaOptions=-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps" \ > --jars > /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \ > --class "${FILE_NAME}" \ > --class ${FILE_NAME} \ > --conf "spark.ui.port=4040" \ > ${JAR_FILE} > > The spark GUI UI port is 4040 (the default). Just track the progress of > the job. You can specify your own port by replacing 4040 by a nom used port > value > > Try it anyway. > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 3 June 2016 at 11:39, Alonso <alons...@gmail.com> wrote: > >> Hi, i am developing a project that needs to use kafka, spark-streaming >> and spark-mllib, this is the github project >> <https://github.com/alonsoir/awesome-recommendation-engine/tree/develop> >> . >> >> I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the >> file that i want to use is only 16 MB, if i finding problems related with >> resources because the process outputs this message: >> >> >> .set("spark.driver.allowMultipleContexts", "true") >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> 16/06/03 11:58:09 WARN TaskSchedulerImpl: Initial job has not accepted >> any resources; check your cluster UI to ensure that workers are registered >> and have sufficient resources >> >> >> when i go to spark-master page, i can see this: >> >> >> *Spark Master at spark://192.168.30.137:7077* >> >> * URL: spark://192.168.30.137:7077* >> * REST URL: spark://192.168.30.137:6066 (cluster mode)* >> * Alive Workers: 0* >> * Cores in use: 0 Total, 0 Used* >> * Memory in use: 0.0 B Total, 0.0 B Used* >> * Applications: 2 Running, 0 Completed* >> * Drivers: 0 Running, 0 Completed* >> * Status: ALIVE* >> >> *Workers* >> *Worker Id Address State Cores Memory* >> *Running Applications* >> *Application ID Name Cores Memory per Node Submitted Time User State >> Duration* >> *app-20160603115752-0001* >> *(kill)* >> * AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:52 cloudera WAITING >> 2.0 min* >> *app-20160603115751-0000* >> *(kill)* >> * AmazonKafkaConnector 0 1024.0 MB 2016/06/03 11:57:51 cloudera WAITING >> 2.0 min* >> >> >> And this is the spark-worker output: >> >> *Spark Worker at 192.168.30.137:7078* >> >> * ID: worker-20160603115937-192.168.30.137-7078* >> * Master URL:* >> * Cores: 4 (0 Used)* >> * Memory: 6.7 GB (0.0 B Used)* >> >> *Back to Master* >> *Running Executors (0)* >> *ExecutorID Cores State Memory Job Details Logs* >> >> It is weird isn't ? master url is not set up and there is not any >> ExecutorID, Cores, so on so forth... >> >> If i do a ps xa | grep spark, this is the output: >> >> [cloudera@quickstart bin]$ ps xa | grep spark >> 6330 ? Sl 0:11 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp >> /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* >> -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m >> org.apache.spark.deploy.master.Master >> >> 6674 ? Sl 0:12 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp >> /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* >> -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory >> -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m >> org.apache.spark.deploy.history.HistoryServer >> >> 8153 pts/1 Sl+ 0:14 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp >> /home/cloudera/awesome-recommendation-engine/target/pack/lib/* >> -Dprog.home=/home/cloudera/awesome-recommendation-engine/target/pack >> -Dprog.version=1.0-SNAPSHOT example.spark.AmazonKafkaConnector >> 192.168.1.35:9092 amazonRatingsTopic >> >> 8413 ? Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp >> /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* >> -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker >> spark://quickstart.cloudera:7077 >> >> 8619 pts/3 S+ 0:00 grep spark >> >> master is set up with four cores and 1 GB and worker has not any >> dedicated core and it is using 1GB, that is weird isn't ? I have configured >> the vmware image with 4 cores (from eight) and 8 GB (from 16). >> >> This is how it looks my build.sbt: >> >> libraryDependencies ++= Seq( >> "org.apache.kafka" % "kafka_2.10" % "0.8.1" >> exclude("javax.jms", "jms") >> exclude("com.sun.jdmk", "jmxtools") >> exclude("com.sun.jmx", "jmxri"), >> //not working play module!! check >> //jdbc, >> //anorm, >> //cache, >> // HTTP client >> "net.databinder.dispatch" %% "dispatch-core" % "0.11.1", >> // HTML parser >> "org.jodd" % "jodd-lagarto" % "3.5.2", >> "com.typesafe" % "config" % "1.2.1", >> "com.typesafe.play" % "play-json_2.10" % "2.4.0-M2", >> "org.scalatest" % "scalatest_2.10" % "2.2.1" % "test", >> "org.twitter4j" % "twitter4j-core" % "4.0.2", >> "org.twitter4j" % "twitter4j-stream" % "4.0.2", >> "org.codehaus.jackson" % "jackson-core-asl" % "1.6.1", >> "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test", >> "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0-cdh5.7.0", >> "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.0", >> "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.0", >> "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.0", >> "org.apache.spark" % "spark-mllib_2.10" % "1.6.0-cdh5.7.0", >> "com.google.code.gson" % "gson" % "2.6.2", >> "commons-cli" % "commons-cli" % "1.3.1", >> "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1", >> // Akka >> "com.typesafe.akka" %% "akka-actor" % akkaVersion, >> "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, >> // MongoDB >> "org.reactivemongo" %% "reactivemongo" % "0.10.0" >> ) >> >> packAutoSettings >> >> As you can see, i am using the exact version of spark modules for the >> pseudo cluster and i want to use sbt-pack in order to create >> an unix command, this is how i am declaring programmatically the spark >> context : >> >> >> val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") >> //.setMaster("local[4]") >> >> .setMaster("spark://192.168.30.137:7077") >> .set("spark.cores.max", "2") >> >> ... >> >> val ratingFile= "hdfs://192.168.30.137:8020/user/cloudera/ratings.csv" >> >> >> println("Using this ratingFile: " + ratingFile) >> // first create an RDD out of the rating file >> val rawTrainingRatings = sc.textFile(ratingFile).map { >> line => >> val Array(userId, productId, scoreStr) = line.split(",") >> AmazonRating(userId, productId, scoreStr.toDouble) >> } >> >> // only keep users that have rated between MinRecommendationsPerUser >> and MaxRecommendationsPerUser products >> >> >> //THIS IS THE LINE THAT PROVOKES the >> *WARN TaskSchedulerImp* >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> *!* >> >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> val trainingRatings = rawTrainingRatings.groupBy(_.userId) >> .filter(r => >> MinRecommendationsPerUser <= r._2.size && r._2.size < >> MaxRecommendationsPerUser) >> .flatMap(_._2) >> .repartition(NumPartitions) >> .cache() >> >> println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings >> out of ${rawTrainingRatings.count()}") >> >> My question is, do you see anything wrong with the code? is there >> anything terrible wrong that i have to change? and, >> what can i do to have this up and running with my resources? >> >> What most annoys me is that the above code works perfectly in the console >> spark of the virtual image but when I try to make it run >> programmatically creating the unix with SBT-pack command does not work. >> >> If the dedicated resources are too few to develop this project, what else >> can i do? i mean, do i need to hire a tiny cluster with AWS >> or any another provider? if that is a correct answer, which are yours >> recommendation? >> >> Thank you very much for reading until here. >> >> Regards, >> >> Alonso >> >> >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >> ------------------------------ >> View this message in context: About a problem running a spark job in a >> cdh-5.7.0 vmware image. >> <http://apache-spark-user-list.1001560.n3.nabble.com/About-a-problem-running-a-spark-job-in-a-cdh-5-7-0-vmware-image-tp27082.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > >