Luis' experience validates what I'm seeing. You have to still set the 
properties in the SparkConf for the context to work. For example, master URL 
and jars are specified again in the app. 

Gino B.

> On Jun 17, 2014, at 12:05 PM, Luis Ángel Vicente Sánchez 
> <langel.gro...@gmail.com> wrote:
> 
> I have been able to submit a job successfully but I had to config my spark 
> job this way:
> 
>   val sparkConf: SparkConf =
>     new SparkConf()
>       .setAppName("TwitterPopularTags")
>       .setMaster("spark://int-spark-master:7077")
>       .setSparkHome("/opt/spark")
>       .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))
> 
> Now I'm getting this error on my worker:
> 
> 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any 
> resources; check your cluster UI to ensure that workers are registered and 
> have sufficient memory
> 
> 
> 
> 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez 
> <langel.gro...@gmail.com>:
>> After playing a bit, I have been able to create a fatjar this way:
>> 
>> lazy val rootDependencies = Seq(
>>   "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" 
>> exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark", 
>> "spark-streaming_2.10")
>> )
>> 
>> Excluding those transitive dependencies, we can create a fatjar ~400Kb 
>> instead of 40Mb.
>> 
>> My problem is not to run the streaming job locally but trying to submit it 
>> to standalone cluster using spark-submit, everytime I ran the following 
>> command, my workers died:
>> 
>> ~/development/tools/spark/1.0.0/bin/spark-submit \
>> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
>> --master "spark://int-spark-master:7077" \
>> --deploy-mode "cluster" \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>> 
>> I have copied my fatjar to my master /tmp folder.
>> 
>> 
>> 2014-06-17 10:30 GMT+01:00 Michael Cutler <mich...@tumra.com>:
>> 
>>> Admittedly getting Spark Streaming / Kafka working for the first time can 
>>> be a bit tricky with the web of dependencies that get pulled in.  I've 
>>> taken the KafkaWorkCount example from the Spark project and set up a simple 
>>> standalone SBT project that shows you how to get it working and using 
>>> spark-submit.
>>> 
>>> https://github.com/cotdp/spark-example-kafka
>>> 
>>> The key trick is in the use of sbt-assembly instead of relying on any of 
>>> the "add jars" functionality.  You mark "spark-core" and "spark-streaming" 
>>> as provided, because they are part of the core spark-assembly already 
>>> running your cluster.  However "spark-streaming-kafka" is not, so you need 
>>> to package it in your 'fat JAR' while excluding all the mess that causes 
>>> the build to break.
>>> 
>>> build.sbt:
>>> 
>>> import AssemblyKeys._
>>> 
>>> 
>>> assemblySettings
>>> 
>>> 
>>> name := "spark-example-kafka"
>>> 
>>> 
>>> version := "1.0"
>>> 
>>> 
>>> scalaVersion := "2.10.4"
>>> 
>>> 
>>> 
>>> 
>>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>> 
>>> 
>>> 
>>> 
>>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>> 
>>> 
>>> 
>>> 
>>> libraryDependencies ++= Seq(
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>> 
>>> 
>>> 
>>>     exclude("commons-beanutils", "commons-beanutils").
>>> 
>>> 
>>> 
>>>     exclude("commons-collections", "commons-collections").
>>> 
>>> 
>>> 
>>>     exclude("com.esotericsoftware.minlog", "minlog")
>>> 
>>> 
>>> 
>>> )
>>> 
>>> 
>>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>> 
>>> 
>>> 
>>>   {
>>>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x => old(x)
>>> 
>>> 
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> You can see the "exclude()" has to go around the spark-streaming-kafka 
>>> dependency, and I've used a MergeStrategy to solve the "deduplicate: 
>>> different file contents found in the following" errors.
>>> 
>>> Build the JAR with sbt assembly and use the scripts in bin/ to run the 
>>> examples.
>>> 
>>> I'm using this same approach to run my Spark Streaming jobs with 
>>> spark-submit and have them managed using Mesos/Marathon to handle failures 
>>> and restarts with long running processes.
>>> 
>>> Good luck!
>>> 
>>> MC
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Michael Cutler
>>> Founder, CTO
>>> 
>>>     
>>> Mobile: +44 789 990 7847
>>> Email:   mich...@tumra.com
>>> Web:     tumra.com
>>> Visit us at our offices in Chiswick Park
>>> Registered in England & Wales, 07916412. VAT No. 130595328
>>> 
>>> 
>>> This email and any files transmitted with it are confidential and may also 
>>> be privileged. It is intended only for the person to whom it is addressed. 
>>> If you have received this email in error, please inform the sender 
>>> immediately. If you are not the intended recipient you must not use, 
>>> disclose, copy, print, distribute or rely on this email.
>>> 
>>> 
>>>> On 17 June 2014 02:51, Gino Bustelo <lbust...@gmail.com> wrote:
>>>> +1 for this issue. Documentation for spark-submit are misleading. Among 
>>>> many issues, the jar support is bad. HTTP urls do not work. This is 
>>>> because spark is using hadoop's FileSystem class. You have to specify the 
>>>> jars twice to get things to work. Once for the DriverWrapper to laid your 
>>>> classes and a 2nd time in the Context to distribute to workers. 
>>>> 
>>>> I would like to see some contrib response to this issue. 
>>>> 
>>>> Gino B.
>>>> 
>>>>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez 
>>>>> <langel.gro...@gmail.com> wrote:
>>>>> 
>>>>> Did you manage to make it work? I'm facing similar problems and this a 
>>>>> serious blocker issue. spark-submit seems kind of broken to me if you can 
>>>>> use it for spark-streaming.
>>>>> 
>>>>> Regards,
>>>>> 
>>>>> Luis
>>>>> 
>>>>> 
>>>>> 2014-06-11 1:48 GMT+01:00 lannyripple <lanny.rip...@gmail.com>:
>>>>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>>>> 
>>>>>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue 
>>>>>> and
>>>>>> does
>>>>>> 
>>>>>>     stream
>>>>>>       .map {case (k, v) => (v, 1)}
>>>>>>       .reduceByKey(_ + _)
>>>>>>       .print()
>>>>>> 
>>>>>> using a 1 second interval on the stream.
>>>>>> 
>>>>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>>>>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) 
>>>>>> as
>>>>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>>>>> classes.  To work around this I modified
>>>>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>>>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>>>>>> 'compile' in my project and picked up in the assembly.)
>>>>>> 
>>>>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>>>>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I 
>>>>>> am
>>>>>> running spark-submit from my spark master.  I am guided by the 
>>>>>> information
>>>>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>>>> 
>>>>>> Well at this point I was going to detail all the ways spark-submit fails 
>>>>>> to
>>>>>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>>>>>> then it just fails to find the driver class.  This is using various
>>>>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>>>>>> and local: prefixes on the application-jar and --jars arguments.
>>>>>> 
>>>>>> If I invoke sparkContext.setJars() and include my assembly jar I get
>>>>>> further.  At this point I get a failure from
>>>>>> kafka.consumer.ConsumerConnector not being found.  I suspect this is 
>>>>>> because
>>>>>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar 
>>>>>> is
>>>>>> too late in the classpath.
>>>>>> 
>>>>>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>>>>>> this causes more things to blow up.
>>>>>> 
>>>>>> I finally found something that works.  Namely setting environment 
>>>>>> variable
>>>>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>>>>>> helpfully informed to
>>>>>> 
>>>>>>   Please instead use:
>>>>>>    - ./spark-submit with --driver-class-path to augment the driver 
>>>>>> classpath
>>>>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>>>> 
>>>>>> which when put into a file and introduced with --properties-file does not
>>>>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>>>>>> the kafka.consumer.ConsumerConnector error.
>>>>>> 
>>>>>> At a guess what's going on is that using SPARK_CLASSPATH I have my 
>>>>>> assembly
>>>>>> jar in the classpath at SparkSubmit invocation
>>>>>> 
>>>>>>   Spark Command: java -cp
>>>>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>>>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>>>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>>>>> /tmp/myjar.jar
>>>>>> 
>>>>>> but using --properties-file then the assembly is not available for
>>>>>> SparkSubmit.
>>>>>> 
>>>>>> I think the root cause is either spark-submit not handling the
>>>>>> spark-streaming libraries so they can be 'provided' or the inclusion of
>>>>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>>>> 
>>>>>>   [error] (*:assembly) deduplicate: different file contents found in the
>>>>>> following:
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>>>> 
>>>>>> I've tried applying mergeStategy in assembly for my assembly.sbt but 
>>>>>> then I
>>>>>> get
>>>>>> 
>>>>>>   Invalid signature file digest for Manifest main attributes
>>>>>> 
>>>>>> If anyone knows the magic to get this working a reply would be greatly
>>>>>> appreciated.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> View this message in context: 
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Reply via email to