Hi Spark Users,

I am trying to use the Akka Camel library together with Spark Streaming and
it is not working when I deploy my Spark application to the Spark Cluster.
It does work when I run the application locally so this seems to be an
issue with how Spark loads the reference.conf file from the Akka Camel jar
when the application gets deployed to the cluster.

I have tried to make a simple application to demonstrate the problem.  It
has a class which uses Akka Camel to create an Actor Based Receiver for
Spark Streaming:

class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
ActorHelper {
  def endpointUri = "netty:tcp://localhost:" + port
  def receive = {
    case data: T => store(data)
  }
}

And the application creates a DStream using the previous class:

object SparkCamelSBT extends App {
  val sparkConf = new SparkConf().setAppName("RawTRCTransformer")
  val ssc = new StreamingContext(sparkConf, Milliseconds(5000))

  case class Data(data: String)

  val dStream = ssc.actorStream[Data](Props(new NettyReceiver[Data](4548)),
"TRCNettyReceiver")

  dStream.print()

  ssc.start()
  ssc.awaitTermination()

}

In local mode this works.  When deployed to the Spark Cluster the following
error is logged by the worker who tries to use Akka Camel:



---------- Forwarded message ----------
From: Patrick McGloin <mcgloin.patr...@gmail.com>
Date: 24 October 2014 15:09
Subject: Re: [akka-user] Akka Camel plus Spark Streaming
To: akka-u...@googlegroups.com


Hi Patrik,

Thanks for your response.  Based on what you said, I tried a couple more
things:

- I tried copying the Akka Camel part of the reference.conf to my
application in case it would try to read it from there.
- I tried calling addJar from the SparkContext to load the
akka-camel_2.10-2.3.6.jar
file.
- I tired adding akka-camel_2.10-2.3.6.jar to the SPARK_CLASSPATH and
restarting the Spark Master and its Workers.

With the third item my thinking was that as Spark had already started Akka
the Akka Camel jar needed to be there at the start and not supplied when a
job was to be started on the Spark Cluster.

None of these worked however.

Thanks for your help.  I will try and boil this down to a very simple
example on my laptop and try and reproduce it.  If its still a problem in
its most basic form I'll ask the Spark group if they know how it should
work.

Best regards,
Patrick



On 24 October 2014 13:36, Patrik Nordwall <patrik.nordw...@gmail.com> wrote:

> If you are not using OSGi you should not bother about my comment related
> to akka-osgi jar.
>
> If you have several jar files in your classpath the reference.conf files
> are merged automatically. If you package your app as a fat jar you must
> make sure that all reference.conf files are merged into one big
> reference.conf file when you assemble the fat jar.
>
> The error you see indicates that the camel section is not included in the
> reference.conf file(s) that you have in the classpath when you run.
>
> I don't know if Spark is doing anything special that would break the
> loading of the reference.conf files.
>
> Regards,
> Patrik
>
> On Fri, Oct 24, 2014 at 11:59 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> Hi Akka users,
>>
>> I am trying to use Akka Camel together with Spark Streaming and I am
>> getting this error message:
>>
>> 14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting found
>> for key 'akka.camel'
>> akka.actor.ActorInitializationException: exception during creation
>>
>> I have followed the pattern for creating an Actor based receiver:
>>
>> http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>
>> My Actor looks like this:
>>
>> class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
>> ActorHelper {
>>   def endpointUri = "netty:tcp://xyz:" + port
>>   def receive = {
>>     case data: T => store(data)
>>   }
>> }
>>
>> And I create a DStream like this:
>>
>> val dstream =  ssc.actorStream[MessageType](Props(new
>> NettyReceiver[MessageType](4548)), "msgNettyReceiver")
>>
>> All good so far.  I use sbt assembly and sbt package to create jar files
>> for the project and the application and I run it on the server using this
>> command:
>>
>> sudo ./spark-submit --class SparkStreamingCamelApp --master
>> spark://xyz:7077 --jars  /opt/app/bigProject.jar --total-executor-cores
>> 3 /opt/app/smallApplication.jar
>>
>> The streaming application runs without errors but in the Spark worker log
>> I see these errors:
>>
>> akka.actor.ActorInitializationException: exception during creation
>> Caused by: java.lang.reflect.InvocationTargetException
>> Caused by: akka.actor.InvalidActorNameException: actor name
>> [camel-supervisor] is not unique!
>> 14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting found
>> for key 'akka.camel'
>> akka.actor.ActorInitializationException: exception during creation
>>
>> I have researched the issue and found that Patrick Nordwell said this
>> issue "indicates that the reference.conf for akka-camel is not loaded":
>>
>> http://grokbase.com/t/gg/akka-user/13bp25kd7f/akka-camel-osgi
>>
>> If I run the following command on the assembled bigProject.jar, the
>> reference.conf is there:
>>
>> [user@xyz tmp]$ jar tvf bigProject.jar | grep reference.conf
>>  81115 Thu Oct 23 15:29:10 BST 2014 reference.conf
>>
>> If I do the same check on the driver application's smallApplication.jar
>> the reference.conf the file is not there.  Is this the issue?  I think not,
>> both jars are passed to the Spark workers and are in the work directory.
>>
>> If I check the contents of reference.conf using the following command:
>>
>> unzip -p bigProject.jar reference.conf
>>
>> I find the following Akka Camel section:
>>
>> akka {
>>   camel {
>>     # Whether JMX should be enabled or disabled for the Camel Context
>>     jmx = off
>>     # enable/disable streaming cache on the Camel Context
>>     streamingCache = on
>>     consumer {
>>       # Configured setting which determines whether one-way communications
>>       # between an endpoint and this consumer actor
>>       # should be auto-acknowledged or application-acknowledged.
>>       # This flag has only effect when exchange is in-only.
>>       auto-ack = on
>>
>>       # When endpoint is out-capable (can produce responses)
>> reply-timeout is the
>>       # maximum time the endpoint can take to send the response before
>> the message
>>       # exchange fails. This setting is used for out-capable, in-only,
>>       # manually acknowledged communication.
>>       reply-timeout = 1m
>>
>>       # The duration of time to await activation of an endpoint.
>>       activation-timeout = 10s
>>     }
>>
>>     #Scheme to FQCN mappings for CamelMessage body conversions
>>     conversions {
>>       "file" = "java.io.InputStream"
>>     }
>>   }
>> }
>>
>> (The file is much bigger, with other sections, of course).
>>
>> So the file is there but I still get the "No configuration setting found
>> for key 'akka.camel'" error.
>>
>> I am using Scala 2.10.4 and Akka 2.2.3, as I believe this is the version
>> that Spark 1.1 uses.
>>
>> Patrick Nordwall also says "akka-osgi_2.10-2.1.4.jar should replace
>> akka-actor_2.10-2.1.4.jar in an osgi environment".
>>
>> I changed my build.sbt so that akka-actor is "provided", like so:
>>
>>   "com.typesafe.akka" % "akka-camel_2.10" % "2.2.3",
>>   "com.typesafe.akka" % "akka-osgi_2.10" % "2.2.3",
>>   "com.typesafe.akka" % "akka-cluster_2.10" % "2.2.3",
>>   "com.typesafe.akka" % "akka-actor_2.10" % "2.2.3" % "provided",
>>   "org.apache.camel" % "camel-netty" % "2.12.3"
>>
>> I have checked and it is not in the assembled jar.  But more than likely
>> Spark itself will be loading the akka-actor jar, right?
>>
>> Any ideas how to get Spark Streaming and Akka Camel working together?  Am
>> I missing something stupid or?  Any help greatly appreciated!
>>
>> Best regards,
>> Patrick
>>
>>
>>
>>
>>  --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-u...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-u...@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

Reply via email to