Re: Problems after upgrading to spark 1.4.0

2015-07-14 Thread Luis Ángel Vicente Sánchez
I have just restarted the job and it doesn't seem that the shutdown hook is
executed. I have attached to this email the log from the driver. It seems
that the slave are not accepting the tasks... but we haven't change
anything on our mesos cluster, we have only upgrade one job to spark 1.4;
is there any config option that had been added and it's mandatory?

2015-07-13 22:12 GMT+01:00 Tathagata Das :

> Spark 1.4.0 added shutdown hooks in the driver to cleanly shutdown the
> Sparkcontext in the driver, which would shutdown the executors. I am not
> sure whether this is related or not, but somehow the executor's shutdown
> hook is being called.
> Can you check the driver logs to see if driver's shutdown hook is
> accidentally being called?
>
>
> On Mon, Jul 13, 2015 at 9:23 AM, Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com> wrote:
>
>> I forgot to mention that this is a long running job, actually a spark
>> streaming job, and it's using mesos coarse mode. I'm still using the
>> unreliable kafka receiver.
>>
>> 2015-07-13 17:15 GMT+01:00 Luis Ángel Vicente Sánchez <
>> langel.gro...@gmail.com>:
>>
>>> I have just upgrade one of my spark jobs from spark 1.2.1 to spark 1.4.0
>>> and after deploying it to mesos, it's not working anymore.
>>>
>>> The upgrade process was quite easy:
>>>
>>> - Create a new docker container for spark 1.4.0.
>>> - Upgrade spark job to use spark 1.4.0 as a dependency and create a new
>>> fatjar.
>>> - Create a docker container for the jobs,  based on previous spark 1.4.0
>>> container.
>>>
>>> After deploying it to marathon, the job only displays the driver under
>>> executors and no task progresses. I haven't made any change to my config
>>> files (apart for updating spark.executors.uri to point to the right file on
>>> s3).
>>>
>>> If I go to mesos and I check my job under frameworks, I can see a few
>>> failed stages; the content of stderr looks always like this:
>>>
>>> I0713 15:59:45.774368  1327 fetcher.cpp:214] Fetching URI 
>>> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
>>> I0713 15:59:45.774483  1327 fetcher.cpp:125] Fetching URI 
>>> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
>>>  with os::net
>>> I0713 15:59:45.774494  1327 fetcher.cpp:135] Downloading 
>>> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
>>>  to 
>>> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
>>> I0713 15:59:50.700959  1327 fetcher.cpp:78] Extracted resource 
>>> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
>>>  into 
>>> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58'
>>> I0713 15:59:50.973274  1333 exec.cpp:132] Version: 0.22.1
>>> I0713 15:59:50.998219  1339 exec.cpp:206] Executor registered on slave 
>>> 20150713-133618-421011372-5050-8867-S5
>>> Using Spark's default log4j profile: 
>>> org/apache/spark/log4j-defaults.properties
>>> 15/07/13 15:59:51 INFO CoarseGrainedExecutorBackend: Registered signal 
>>> handlers for [TERM, HUP, INT]
>>> 15/07/13 15:59:52 WARN NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> 15/07/13 15:59:52 INFO SecurityManager: Changing view acls to: root
>>> 15/07/13 15:59:52 INFO SecurityManager: Changing modify acls to: root
>>> 15/07/13 15:59:52 INFO SecurityManager: SecurityManager: authentication 
>>> disabled; ui acls disabled; users with view permissions: Set(root); users 
>>> with modify permissions: Set(root)
>>> 15/07/13 15:59:52 INFO Slf4jLogger: Slf4jLogger started
>>> 15/07/13 15:59:52 INFO Remoting: Starting remoting
>>> 15/07/13 15:59:53 INFO Remoting: Remoting started; listening on addresses 
>>> :[akka.tcp://driverpropsfetc...@int-mesos-slave-ib4583253.mclabs.io:41854]
>>> 15/07/13 15:59:53 INFO Utils: Successfu

Re: Problems after upgrading to spark 1.4.0

2015-07-13 Thread Luis Ángel Vicente Sánchez
I forgot to mention that this is a long running job, actually a spark
streaming job, and it's using mesos coarse mode. I'm still using the
unreliable kafka receiver.

2015-07-13 17:15 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I have just upgrade one of my spark jobs from spark 1.2.1 to spark 1.4.0
> and after deploying it to mesos, it's not working anymore.
>
> The upgrade process was quite easy:
>
> - Create a new docker container for spark 1.4.0.
> - Upgrade spark job to use spark 1.4.0 as a dependency and create a new
> fatjar.
> - Create a docker container for the jobs,  based on previous spark 1.4.0
> container.
>
> After deploying it to marathon, the job only displays the driver under
> executors and no task progresses. I haven't made any change to my config
> files (apart for updating spark.executors.uri to point to the right file on
> s3).
>
> If I go to mesos and I check my job under frameworks, I can see a few
> failed stages; the content of stderr looks always like this:
>
> I0713 15:59:45.774368  1327 fetcher.cpp:214] Fetching URI 
> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
> I0713 15:59:45.774483  1327 fetcher.cpp:125] Fetching URI 
> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
>  with os::net
> I0713 15:59:45.774494  1327 fetcher.cpp:135] Downloading 
> 'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
>  to 
> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
> I0713 15:59:50.700959  1327 fetcher.cpp:78] Extracted resource 
> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
>  into 
> '/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58'
> I0713 15:59:50.973274  1333 exec.cpp:132] Version: 0.22.1
> I0713 15:59:50.998219  1339 exec.cpp:206] Executor registered on slave 
> 20150713-133618-421011372-5050-8867-S5
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 15/07/13 15:59:51 INFO CoarseGrainedExecutorBackend: Registered signal 
> handlers for [TERM, HUP, INT]
> 15/07/13 15:59:52 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 15/07/13 15:59:52 INFO SecurityManager: Changing view acls to: root
> 15/07/13 15:59:52 INFO SecurityManager: Changing modify acls to: root
> 15/07/13 15:59:52 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(root); users 
> with modify permissions: Set(root)
> 15/07/13 15:59:52 INFO Slf4jLogger: Slf4jLogger started
> 15/07/13 15:59:52 INFO Remoting: Starting remoting
> 15/07/13 15:59:53 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://driverpropsfetc...@int-mesos-slave-ib4583253.mclabs.io:41854]
> 15/07/13 15:59:53 INFO Utils: Successfully started service 
> 'driverPropsFetcher' on port 41854.
> 15/07/13 15:59:53 INFO SecurityManager: Changing view acls to: root
> 15/07/13 15:59:53 INFO SecurityManager: Changing modify acls to: root
> 15/07/13 15:59:53 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(root); users 
> with modify permissions: Set(root)
> 15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator: Shutting 
> down remote daemon.
> 15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator: Remote 
> daemon shut down; proceeding with flushing remote transports.
> 15/07/13 15:59:53 INFO Slf4jLogger: Slf4jLogger started
> 15/07/13 15:59:53 INFO Remoting: Starting remoting
> 15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator: Remoting 
> shut down.
> 15/07/13 15:59:53 INFO Utils: Successfully started service 'sparkExecutor' on 
> port 60219.
> 15/07/13 15:59:53 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkexecu...@int-mesos-slave-ib4583253.mclabs.io:60219]
> 15/07/13 15:59:53 INFO DiskBlockManager: Created local directory at 
> /var/log/mcsvc/sparktmpdir/spark-2ca9b3eb-ce70-44e5-9546-1a83f63dc439/blockmgr-4047306e-9dc8-48e4-bc25-300f4cf0be87
> 15/07/13 15:59:53 INFO MemoryStor

Problems after upgrading to spark 1.4.0

2015-07-13 Thread Luis Ángel Vicente Sánchez
I have just upgrade one of my spark jobs from spark 1.2.1 to spark 1.4.0
and after deploying it to mesos, it's not working anymore.

The upgrade process was quite easy:

- Create a new docker container for spark 1.4.0.
- Upgrade spark job to use spark 1.4.0 as a dependency and create a new
fatjar.
- Create a docker container for the jobs,  based on previous spark 1.4.0
container.

After deploying it to marathon, the job only displays the driver under
executors and no task progresses. I haven't made any change to my config
files (apart for updating spark.executors.uri to point to the right file on
s3).

If I go to mesos and I check my job under frameworks, I can see a few
failed stages; the content of stderr looks always like this:

I0713 15:59:45.774368  1327 fetcher.cpp:214] Fetching URI
'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
I0713 15:59:45.774483  1327 fetcher.cpp:125] Fetching URI
'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
with os::net
I0713 15:59:45.774494  1327 fetcher.cpp:135] Downloading
'http://s3-eu-west-1.amazonaws.com/int-mesos-data/frameworks/spark/spark-1.4.0-bin-hadoop2.4.tgz'
to 
'/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
I0713 15:59:50.700959  1327 fetcher.cpp:78] Extracted resource
'/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58/spark-1.4.0-bin-hadoop2.4.tgz'
into 
'/var/log/mcsvc/mesostmpdir/slaves/20150713-133618-421011372-5050-8867-S5/frameworks/20150713-152326-421011372-5050-12921-0002/executors/9/runs/9e44b2ea-c738-4e76-8103-3a85ce752b58'
I0713 15:59:50.973274  1333 exec.cpp:132] Version: 0.22.1
I0713 15:59:50.998219  1339 exec.cpp:206] Executor registered on slave
20150713-133618-421011372-5050-8867-S5
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/13 15:59:51 INFO CoarseGrainedExecutorBackend: Registered signal
handlers for [TERM, HUP, INT]
15/07/13 15:59:52 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/07/13 15:59:52 INFO SecurityManager: Changing view acls to: root
15/07/13 15:59:52 INFO SecurityManager: Changing modify acls to: root
15/07/13 15:59:52 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(root); users with modify permissions: Set(root)
15/07/13 15:59:52 INFO Slf4jLogger: Slf4jLogger started
15/07/13 15:59:52 INFO Remoting: Starting remoting
15/07/13 15:59:53 INFO Remoting: Remoting started; listening on
addresses 
:[akka.tcp://driverpropsfetc...@int-mesos-slave-ib4583253.mclabs.io:41854]
15/07/13 15:59:53 INFO Utils: Successfully started service
'driverPropsFetcher' on port 41854.
15/07/13 15:59:53 INFO SecurityManager: Changing view acls to: root
15/07/13 15:59:53 INFO SecurityManager: Changing modify acls to: root
15/07/13 15:59:53 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(root); users with modify permissions: Set(root)
15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.
15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.
15/07/13 15:59:53 INFO Slf4jLogger: Slf4jLogger started
15/07/13 15:59:53 INFO Remoting: Starting remoting
15/07/13 15:59:53 INFO RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.
15/07/13 15:59:53 INFO Utils: Successfully started service
'sparkExecutor' on port 60219.
15/07/13 15:59:53 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkexecu...@int-mesos-slave-ib4583253.mclabs.io:60219]
15/07/13 15:59:53 INFO DiskBlockManager: Created local directory at
/var/log/mcsvc/sparktmpdir/spark-2ca9b3eb-ce70-44e5-9546-1a83f63dc439/blockmgr-4047306e-9dc8-48e4-bc25-300f4cf0be87
15/07/13 15:59:53 INFO MemoryStore: MemoryStore started with capacity 267.5 MB
Exception in thread "main" java.io.FileNotFoundException:
/etc/mindcandy/metrics.properties (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileInputStream.(FileInputStream.java:93)
at 
org.apache.spark.metrics.MetricsConfig$$anonfun$1.apply(MetricsConfig.scala:50)
at 
org.apache.spark.metrics.MetricsConfig$$anonfun$1.apply(MetricsConfig.scala:50)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.metrics.MetricsConfig.initialize(MetricsConfig.scala:50)
at org.

Re: Duplicated UnusedStubClass in assembly

2015-07-13 Thread Luis Ángel Vicente Sánchez
Hi! I was just raising this issue, I already solved it by excluding that
transitive dependency. Thanks for your help anyway :)

2015-07-13 14:43 GMT+01:00 Cody Koeninger :

> Yeah, I had brought that up a while back, but didn't get agreement on
> removing the stub.  Seems to be an intermittent problem.  You can just add
> an exclude:
>
>
> mergeStrategy in assembly := {
>
>   case PathList("org", "apache", "spark", "unused",
> "UnusedStubClass.class") => MergeStrategy.first
>
>   case x => (mergeStrategy in assembly).value(x)
>
> }
>
> On Mon, Jul 13, 2015 at 6:55 AM, Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com> wrote:
>
>> I have just upgraded to spark 1.4.0 and it seems that
>> spark-streaming-kafka has a dependency on org.spark-project.spark unused
>> 1.0.0 but it also embeds that jar in its artifact, causing a problem while
>> creating a fatjar.
>>
>> This is the error:
>>
>> [Step 1/1] (*:assembly) deduplicate: different file contents found in the
>>> following:
>>>
>>> /data/system/sbt_ivy/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.4.0.jar:org/apache/spark/unused/UnusedStubClass.class
>>>
>>> /data/system/sbt_ivy/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class
>>
>>
>


Duplicated UnusedStubClass in assembly

2015-07-13 Thread Luis Ángel Vicente Sánchez
I have just upgraded to spark 1.4.0 and it seems that spark-streaming-kafka
has a dependency on org.spark-project.spark unused 1.0.0 but it also embeds
that jar in its artifact, causing a problem while creating a fatjar.

This is the error:

[Step 1/1] (*:assembly) deduplicate: different file contents found in the
> following:
>
> /data/system/sbt_ivy/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.4.0.jar:org/apache/spark/unused/UnusedStubClass.class
>
> /data/system/sbt_ivy/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class


Re: Streaming problems running 24x7

2015-04-20 Thread Luis Ángel Vicente Sánchez
You have a window operation; I have seen that behaviour before with window
operations in spark streaming. My solution was to move away from window
operations using probabilistic data structures; it might not be an option
for you.

2015-04-20 10:29 GMT+01:00 Marius Soutier :

> The processing speed displayed in the UI doesn’t seem to take everything
> into account. I also had a low processing time but had to increase batch
> duration from 30 seconds to 1 minute because waiting batches kept
> increasing. Now it runs fine.
>
> On 17.04.2015, at 13:30, González Salgado, Miquel <
> miquel.gonza...@tecsidel.es> wrote:
>
> Hi,
>
> Thank you for your response,
> I think it is not because of the processing speed, in fact the delay is
> under 1 second, while the batch interval is 10 seconds… The data volume is
> low (10 lines / second)
>
> Changing to local[8] was worsening the problem (cpu increase more quickly)
>
> By the way, I have seen some results changing to this call of Kafkautils:
>
> KafkaUtils.createDirectStream
>
> CPU usage is low and stable, but memory is slowly increasing… But at least
> the process last longer..
>
> Best regards,
> Miquel
>
>
> *De:* bit1...@163.com [mailto:bit1...@163.com ]
> *Enviado el:* jueves, 16 de abril de 2015 10:58
> *Para:* González Salgado, Miquel; user
> *Asunto:* Re: Streaming problems running 24x7
>
> From your description, looks like the data processing speed is far behind
> the data receiving speed
>
> Could you try to increase the core number when you submit the application?
> such as local[8]?
>
> --
> bit1...@163.com
>
>
> *From:* Miquel 
> *Date:* 2015-04-16 16:39
> *To:* user 
> *Subject:* Streaming problems running 24x7
> Hello,
> I'm finding problems to run a spark streaming job for more than a few hours
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
> symptoms:
>
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
> error is being thrown (java.lang.Exception: Could not compute split, block
> input-0-1429168311800 not found) and data stops being calculated.
>
> - The delay showed in web UI keeps also increasing.
>
> - After some hours disk space is being consumed. There are a lot of
> directories with name like
> "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>
> The job is basically reading information from kafka topic, and calculate
> several topN tables for some key and value camps related with netflow data,
> some of the parameters are this:
> - batch interval: 10 seconds
> - window calculation: 1 minute
> - spark.cleaner.ttl: 5 minutes
>
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
> options to run it is as follows:
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR
>
> someone has some clues about the problem? I don't know if it is a
> configuration problem or some error in the code that is causing memory
> leaks..
>
> Thank you in advance!
> Miquel
>
> PD: the code is basically this:--
>
> object NetflowTopn {
>
>   var appPath = "."
>   var zkQuorum = ""
>   var group = ""
>   var topics = ""
>   var numThreads = 1
>
>   var batch_interval = 10
>   var n_window = 1
>   var n_slide = 1
>   var topnsize = 10
>
>   var hm = Map[String,Int]()
>   hm += ( "unix_secs" ->  0 )
>   hm += ( "unix_nsecs" -> 1 )
>   hm += ( "sysuptime" ->  2 )
>   hm += ( "exaddr" -> 3 )
>   hm += ( "dpkts" ->  4 )
>   hm += ( "doctets" ->5 )
>   hm += ( "first" ->  6 )
>   hm += ( "last" ->   7 )
>   hm += ( "engine_type" -> 8 )
>   hm += ( "engine_id" ->   9 )
>   hm += ( "srcaddr" ->10 )
>   hm += ( "dstaddr" ->11 )
>   hm += ( "nexthop" ->12 )
>   hm += ( "input" ->  13 )
>   hm += ( "output" -> 14 )
>   hm += ( "srcport" ->15 )
>   hm += ( "dstport" ->16 )
>   hm += ( "prot" ->   17 )
>   hm += ( "tos" ->18 )
>   hm += ( "tcp_flags" ->  19 )
>   hm += ( "src_mask" ->   20 )
>   hm += ( "dst_mask" ->   21 )
>   hm += ( "src_as" -> 22 )
>   hm += ( "dst_as" -> 23 )
>
>   def getKey (lcamps: Array[String], camp: String): String  = {
> if (camp == "total") return "total"
> else return lcamps(hm(camp))
>   }
>
>   def getVal (lcamps: Array[String], camp: String): Long  = {
> if (camp == "flows") return 1L
> else return lcamps(hm(camp)).toLong
>   }
>
>   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) =
> {
> val arr = line.split(",")
> (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
>   }
>
>   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
> csvheader: String, valcamp: String, prefix: String) = {
>
>val ts = System.currentTimeMillis
>val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_"
> +
> valcamp + ".csv"
>val f1f = new File(f1);
>   

foreachRDD execution

2015-03-25 Thread Luis Ángel Vicente Sánchez
I have a simple and probably dumb question about foreachRDD.

We are using spark streaming + cassandra to compute concurrent users every
5min. Our batch size is 10secs and our block interval is 2.5secs.

At the end of the world we are using foreachRDD to join the data in the RDD
with existing data in Cassandra, update the counters and then save it back
to Cassandra.

To the best of my understanding, in this scenario, spark streaming produces
one RDD every 10secs and foreachRDD executes them sequentially, that is,
foreachRDD would never run in parallel.

Am I right?

Regards,

Luis


Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
Yes, I have just found that. By replacing,

rdd.map(_._1).distinct().foreach {
  case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}

with,

rdd.map(_._1).distinct().collect().foreach {
  case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}

everything works as expected.

2015-01-21 14:18 GMT+00:00 Sean Owen :

> It looks like you are trying to use the RDD in a distributed operation,
> which won't work. The context will be null.
> On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" <
> langel.gro...@gmail.com> wrote:
>
>> The SparkContext is lost when I call the persist function from the sink
>> function, just before the function call... everything works as intended so
>> I guess is the FunctionN class serialisation what it's causing the problem.
>> I will try to embed the functionality in the sink method to verify that.
>>
>> 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
>> langel.gro...@gmail.com>:
>>
>>> The following functions,
>>>
>>> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
>>> HLL))]): Unit = {
>>> data.foreachRDD { rdd =>
>>>   rdd.cache()
>>>   val (minTime, maxTime): (Long, Long) =
>>> rdd.map {
>>>   case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
>>> }.fold((Long.MaxValue, Long.MinValue)) {
>>>   case ((min, max), (num, _)) => (math.min(min, num),
>>> math.max(max, num))
>>> }
>>>   if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
>>> rdd.map(_._1).distinct().foreach {
>>>   case (game, category) => persist(game, category, minTime,
>>> maxTime, rdd)
>>> }
>>>   }
>>>   rdd.unpersist(blocking = false)
>>> }
>>>   }
>>>
>>>   def persist(game: GameID, category: Category, min: Long, max: Long,
>>> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
>>> {
>>> val family: String = s"${parameters.table.family}_$
>>> {game.repr}_${category.repr}"
>>> val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>>>   data.sparkContext.cassandraTable[(Long, Long, String,
>>> Array[Byte])](parameters.table.keyspace, family)
>>> val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>>>   cas
>>> .where(""""time" >= ?""", new Date(min))
>>> .where(""""time" <= ?""", new Date(max))
>>> .map {
>>>   case (date, time, platform, array) => ((TimeSeriesKey(date,
>>> time), Platform(platform)), HyperLogLog.fromBytes(array))
>>> }
>>> data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
>>> {
>>>   case ((key, platform), (value, maybe)) =>
>>> (key.date, key.time, platform.repr, 
>>> HyperLogLog.toBytes(maybe.fold(value)(array
>>> => value + array)))
>>> }.saveToCassandra(parameters.table.keyspace, family)
>>>   }
>>>
>>> are causing this exception at runtime:
>>>
>>> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
>>> (TID 126)
>>> java.lang.NullPointerException
>>> at com.datastax.spark.connector.SparkContextFunctions.
>>> cassandraTable$default$3(SparkContextFunctions.scala:47)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:41)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:40)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(
>>> Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>>> SparkContext.scala:1143)
>>> at org.apache.spark.SparkContext$$anonfun$

Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
The SparkContext is lost when I call the persist function from the sink
function, just before the function call... everything works as intended so
I guess is the FunctionN class serialisation what it's causing the problem.
I will try to embed the functionality in the sink method to verify that.

2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> The following functions,
>
> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
> HLL))]): Unit = {
> data.foreachRDD { rdd =>
>   rdd.cache()
>   val (minTime, maxTime): (Long, Long) =
> rdd.map {
>   case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
> }.fold((Long.MaxValue, Long.MinValue)) {
>   case ((min, max), (num, _)) => (math.min(min, num),
> math.max(max, num))
> }
>   if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
> rdd.map(_._1).distinct().foreach {
>   case (game, category) => persist(game, category, minTime,
> maxTime, rdd)
> }
>   }
>   rdd.unpersist(blocking = false)
> }
>   }
>
>   def persist(game: GameID, category: Category, min: Long, max: Long,
> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
> {
> val family: String = s"${parameters.table.family}_$
> {game.repr}_${category.repr}"
> val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>   data.sparkContext.cassandraTable[(Long, Long, String,
> Array[Byte])](parameters.table.keyspace, family)
> val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>   cas
> .where(""""time" >= ?""", new Date(min))
> .where(""""time" <= ?""", new Date(max))
> .map {
>   case (date, time, platform, array) => ((TimeSeriesKey(date,
> time), Platform(platform)), HyperLogLog.fromBytes(array))
> }
> data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
> {
>   case ((key, platform), (value, maybe)) =>
> (key.date, key.time, platform.repr, 
> HyperLogLog.toBytes(maybe.fold(value)(array
> => value + array)))
> }.saveToCassandra(parameters.table.keyspace, family)
>   }
>
> are causing this exception at runtime:
>
> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
> 126)
> java.lang.NullPointerException
> at com.datastax.spark.connector.SparkContextFunctions.
> cassandraTable$default$3(SparkContextFunctions.scala:47)
> at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
> at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
> ActiveUsersJobImpl.scala:41)
> at com.mindcandy.services.mako.concurrentusers.
> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
> ActiveUsersJobImpl.scala:40)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
> scala:759)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
> scala:759)
> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
> SparkContext.scala:1143)
> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
> SparkContext.scala:1143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:178)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
> SparkContextFunctions.scala is the implicit CassandraConnector that uses
> the underlying spark context to retrieve the SparkConf.
>
> After a few hours debugging the code, the source of the problem is that,
>
> data.sparkContext
>
> is returning null. It seems that the RDD is serialised and the
> SparkContext is lost. Is this the expected behaviour? Is a known bug?
>
> I have ran out of ideas on how to make this work so I'm open to
> suggestions.
>
> Kind regards,
>
> Luis
>


NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
The following functions,

def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
HLL))]): Unit = {
data.foreachRDD { rdd =>
  rdd.cache()
  val (minTime, maxTime): (Long, Long) =
rdd.map {
  case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
}.fold((Long.MaxValue, Long.MinValue)) {
  case ((min, max), (num, _)) => (math.min(min, num), math.max(max,
num))
}
  if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
rdd.map(_._1).distinct().foreach {
  case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}
  }
  rdd.unpersist(blocking = false)
}
  }

  def persist(game: GameID, category: Category, min: Long, max: Long, data:
RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = {
val family: String = s"${parameters.table.family}_$
{game.repr}_${category.repr}"
val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
  data.sparkContext.cassandraTable[(Long, Long, String,
Array[Byte])](parameters.table.keyspace, family)
val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
  cas
.where(time" >= ?""", new Date(min))
.where(time" <= ?""", new Date(max))
.map {
  case (date, time, platform, array) => ((TimeSeriesKey(date,
time), Platform(platform)), HyperLogLog.fromBytes(array))
}
data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
{
  case ((key, platform), (value, maybe)) =>
(key.date, key.time, platform.repr,
HyperLogLog.toBytes(maybe.fold(value)(array
=> value + array)))
}.saveToCassandra(parameters.table.keyspace, family)
  }

are causing this exception at runtime:

15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
126)
java.lang.NullPointerException
at com.datastax.spark.connector.SparkContextFunctions.
cassandraTable$default$3(SparkContextFunctions.scala:47)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:41)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:40)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
SparkContextFunctions.scala is the implicit CassandraConnector that uses
the underlying spark context to retrieve the SparkConf.

After a few hours debugging the code, the source of the problem is that,

data.sparkContext

is returning null. It seems that the RDD is serialised and the SparkContext
is lost. Is this the expected behaviour? Is a known bug?

I have ran out of ideas on how to make this work so I'm open to
suggestions.

Kind regards,

Luis


Re: Kafka Receiver not recreated after executor died

2014-12-16 Thread Luis Ángel Vicente Sánchez
It seems to be slightly related to this:

https://issues.apache.org/jira/browse/SPARK-1340

But in this case, it's not the Task that is failing but the entire executor
where the Kafka Receiver resides.

2014-12-16 16:53 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:
>
> Dear spark community,
>
>
> We were testing a spark failure scenario where the executor that is
> running a Kafka Receiver dies.
>
> We are running our streaming jobs on top of mesos and we killed the mesos
> slave that was running the executor ; a new executor was created on another
> mesos-slave but according to the Driver UI, the Kafka receiver location is
> still on the dead slave.
>
> It seems that if the executor where the receiver was running dies, spark
> on top of mesos is not able to create a new receiver on a different
> executor to continue working.
>
> Is that a known issue? Is there a way we could ensure that the receiver
> would be recreated?
>
> Kind regards,
>
> Luis Vicente
>


Kafka Receiver not recreated after executor died

2014-12-16 Thread Luis Ángel Vicente Sánchez
Dear spark community,


We were testing a spark failure scenario where the executor that is running
a Kafka Receiver dies.

We are running our streaming jobs on top of mesos and we killed the mesos
slave that was running the executor ; a new executor was created on another
mesos-slave but according to the Driver UI, the Kafka receiver location is
still on the dead slave.

It seems that if the executor where the receiver was running dies, spark on
top of mesos is not able to create a new receiver on a different executor
to continue working.

Is that a known issue? Is there a way we could ensure that the receiver
would be recreated?

Kind regards,

Luis Vicente


Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Luis Ángel Vicente Sánchez
My main complain about the WAL mechanism in the new reliable kafka receiver
is that you have to enable checkpointing and for some reason, even if
spark.cleaner.ttl is set to a reasonable value, only the metadata is
cleaned periodically. In my tests, using a folder in my filesystem as the
checkpoint folder, the receivedMetaData folder remains almost constant in
size but the receivedData folder is always increasing; the spark.cleaner.ttl
was configured to 300 seconds.

2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com>:

> Hi,
>
> Yes, as Jerry mentioned, the Spark -3129 (
> https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
> which solves the Driver failure problem. The way 3129 is designed , it
> solved the driver failure problem agnostic of the source of the stream (
> like Kafka or Flume etc) But with just 3129 you can not achieve complete
> solution for data loss. You need a reliable receiver which should also
> solves the data loss issue on receiver failure.
>
> The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
> for which this email thread was started has solved that problem with Kafka
> Low Level API.
>
> And Spark-4062 as Jerry mentioned also recently solved the same problem
> using Kafka High Level API.
>
> On the Kafka High Level Consumer API approach , I would like to mention
> that Kafka 0.8 has some issue as mentioned in this wiki (
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
> where consumer re-balance sometime fails and that is one of the key reason
> Kafka is re-writing consumer API in Kafka 0.9.
>
> I know there are few folks already have faced this re-balancing issues
> while using Kafka High Level API , and If you ask my opinion, we at Pearson
> are still using the Low Level Consumer as this seems to be more robust and
> performant and we have been using this for few months without any issue
> ..and also I may be little biased :)
>
> Regards,
> Dibyendu
>
>
>
> On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai 
> wrote:
>
>> Hi Rod,
>>
>> The purpose of introducing  WAL mechanism in Spark Streaming as a general
>> solution is to make all the receivers be benefit from this mechanism.
>>
>> Though as you said, external sources like Kafka have their own checkpoint
>> mechanism, instead of storing data in WAL, we can only store metadata to
>> WAL, and recover from the last committed offsets. But this requires
>> sophisticated design of Kafka receiver with low-level API involved, also we
>> need to take care of rebalance and fault tolerance things by ourselves. So
>> right now instead of implementing a whole new receiver, we choose to
>> implement a simple one, though the performance is not so good, it's much
>> easier to understand and maintain.
>>
>> The design purpose and implementation of reliable Kafka receiver can be
>> found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
>> future, to improve the reliable Kafka receiver like what you mentioned is
>> on our scheduler.
>>
>> Thanks
>> Jerry
>>
>>
>> -Original Message-
>> From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
>> Sent: Wednesday, December 3, 2014 5:44 AM
>> To: u...@spark.incubator.apache.org
>> Subject: Re: Low Level Kafka Consumer for Spark
>>
>> Dibyendu,
>>
>> Just to make sure I will not be misunderstood - My concerns are referring
>> to the Spark upcoming solution and not yours. I would to gather the
>> perspective of someone which implemented recovery with Kafka a different
>> way.
>>
>> Tnks,
>> Rod
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: RDD data checkpoint cleaning

2014-11-28 Thread Luis Ángel Vicente Sánchez
Are there any news about this issue? I was using a local folder in linux
for checkpointing, "file:///opt/sparkfolders/checkpoints". I think that
being able to use the ReliableKafkaReceiver in a 24x7 system without having
to worry about disk getting full is a reasonable expectation.

Regards,

Luis

2014-11-21 15:17 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I have seen the same behaviour while testing the latest spark 1.2.0
> snapshot.
>
> I'm trying the ReliableKafkaReceiver and it works quite well but the
> checkpoints folder is always increasing in size. The receivedMetaData
> folder remains almost constant in size but the receivedData folder is
> always increasing in size even if I set spark.cleaner.ttl to 300 seconds.
>
> Regards,
>
> Luis
>
> 2014-09-23 22:47 GMT+01:00 RodrigoB :
>
>> Just a follow-up.
>>
>> Just to make sure about the RDDs not being cleaned up, I just replayed the
>> app both on the windows remote laptop and then on the linux machine and at
>> the same time was observing the RDD folders in HDFS.
>>
>> Confirming the observed behavior: running on the laptop I could see the
>> RDDs
>> continuously increasing. When I ran on linux, only two RDD folders were
>> there and continuously being recycled.
>>
>> Metadata checkpoints were being cleaned on both scenarios.
>>
>> tnks,
>> Rod
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark 1.1.1 released but not available on maven repositories

2014-11-28 Thread Luis Ángel Vicente Sánchez
Are there any news about this issue? I have checked again maven central and
the artefacts are still not there.

Regards,

Luis

2014-11-27 10:42 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I have just read on the website that spark 1.1.1 has been released but
> when I upgraded my project to use 1.1.1 I discovered that the artefacts are
> not on maven yet.
>
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.10;1.1.1 ...
>>
>> [warn] module not found:
>>> org.apache.spark#spark-streaming-kafka_2.10;1.1.1
>>
>> [warn]  local: tried
>>
>> [warn]
>>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.10/1.1.1/ivys/ivy.xml
>>
>> [warn]  public: tried
>>
>> [warn]
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.1.1/spark-streaming-kafka_2.10-1.1.1.pom
>>
>> [warn]  sonatype snapshots: tried
>>
>> [warn]
>>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-streaming-kafka_2.10/1.1.1/spark-streaming-kafka_2.10-1.1.1.pom
>>
>> [info] Resolving org.apache.spark#spark-core_2.10;1.1.1 ...
>>
>> [warn] module not found: org.apache.spark#spark-core_2.10;1.1.1
>>
>> [warn]  local: tried
>>
>> [warn]
>>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-core_2.10/1.1.1/ivys/ivy.xml
>>
>> [warn]  public: tried
>>
>> [warn]
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.1/spark-core_2.10-1.1.1.pom
>>
>> [warn]  sonatype snapshots: tried
>>
>> [warn]
>>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-core_2.10/1.1.1/spark-core_2.10-1.1.1.pom
>>
>> [info] Resolving org.apache.spark#spark-streaming_2.10;1.1.1 ...
>>
>> [warn] module not found: org.apache.spark#spark-streaming_2.10;1.1.1
>>
>> [warn]  local: tried
>>
>> [warn]
>>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-streaming_2.10/1.1.1/ivys/ivy.xml
>>
>> [warn]  public: tried
>>
>> [warn]
>>> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1.1.1/spark-streaming_2.10-1.1.1.pom
>>
>> [warn]  sonatype snapshots: tried
>>
>> [warn]
>>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-streaming_2.10/1.1.1/spark-streaming_2.10-1.1.1.pom
>>>
>>


Spark 1.1.1 released but not available on maven repositories

2014-11-27 Thread Luis Ángel Vicente Sánchez
I have just read on the website that spark 1.1.1 has been released but when
I upgraded my project to use 1.1.1 I discovered that the artefacts are not
on maven yet.

[info] Resolving org.apache.spark#spark-streaming-kafka_2.10;1.1.1 ...
>
> [warn] module not found: org.apache.spark#spark-streaming-kafka_2.10;1.1.1
>
> [warn]  local: tried
>
> [warn]
>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.10/1.1.1/ivys/ivy.xml
>
> [warn]  public: tried
>
> [warn]
>> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.1.1/spark-streaming-kafka_2.10-1.1.1.pom
>
> [warn]  sonatype snapshots: tried
>
> [warn]
>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-streaming-kafka_2.10/1.1.1/spark-streaming-kafka_2.10-1.1.1.pom
>
> [info] Resolving org.apache.spark#spark-core_2.10;1.1.1 ...
>
> [warn] module not found: org.apache.spark#spark-core_2.10;1.1.1
>
> [warn]  local: tried
>
> [warn]
>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-core_2.10/1.1.1/ivys/ivy.xml
>
> [warn]  public: tried
>
> [warn]
>> https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.1/spark-core_2.10-1.1.1.pom
>
> [warn]  sonatype snapshots: tried
>
> [warn]
>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-core_2.10/1.1.1/spark-core_2.10-1.1.1.pom
>
> [info] Resolving org.apache.spark#spark-streaming_2.10;1.1.1 ...
>
> [warn] module not found: org.apache.spark#spark-streaming_2.10;1.1.1
>
> [warn]  local: tried
>
> [warn]
>> /Users/luis.vicente/.ivy2/local/org.apache.spark/spark-streaming_2.10/1.1.1/ivys/ivy.xml
>
> [warn]  public: tried
>
> [warn]
>> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1.1.1/spark-streaming_2.10-1.1.1.pom
>
> [warn]  sonatype snapshots: tried
>
> [warn]
>> https://oss.sonatype.org/content/repositories/snapshots/org/apache/spark/spark-streaming_2.10/1.1.1/spark-streaming_2.10-1.1.1.pom
>>
>


Re: RDD data checkpoint cleaning

2014-11-21 Thread Luis Ángel Vicente Sánchez
I have seen the same behaviour while testing the latest spark 1.2.0
snapshot.

I'm trying the ReliableKafkaReceiver and it works quite well but the
checkpoints folder is always increasing in size. The receivedMetaData
folder remains almost constant in size but the receivedData folder is
always increasing in size even if I set spark.cleaner.ttl to 300 seconds.

Regards,

Luis

2014-09-23 22:47 GMT+01:00 RodrigoB :

> Just a follow-up.
>
> Just to make sure about the RDDs not being cleaned up, I just replayed the
> app both on the windows remote laptop and then on the linux machine and at
> the same time was observing the RDD folders in HDFS.
>
> Confirming the observed behavior: running on the laptop I could see the
> RDDs
> continuously increasing. When I ran on linux, only two RDD folders were
> there and continuously being recycled.
>
> Metadata checkpoints were being cleaned on both scenarios.
>
> tnks,
> Rod
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming: CoarseGrainedExecutorBackend: Slave registration failed: Duplicate executor ID

2014-09-16 Thread Luis Ángel Vicente Sánchez
I dug a bit more and the executor ID is a number so it's seems there is not
possible workaround.

Looking at the code of the CoarseGrainedSchedulerBackend.scala:

https://github.com/apache/spark/blob/6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L86

It seems there is only one DriverActor and, as the RegisterExecutor message
only contains the executorID, there is a collision.

I wonder if what I'm doing is wrong... basically, from the same scala
application I...

1. create one actor per job.
2. send a message to each actor with configuration details to create a
SparkContext/StreamingContext.
3. send a message to each actor to start the job and streaming context.

2014-09-16 13:29 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> When I said scheduler I meant executor backend.
>
> 2014-09-16 13:26 GMT+01:00 Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com>:
>
>> It seems that, as I have a single scala application, the scheduler is the
>> same and there is a collision between executors of both spark context. Is
>> there a way to change how the executor ID is generated (maybe an uuid
>> instead of a sequential number..?)
>>
>> 2014-09-16 13:07 GMT+01:00 Luis Ángel Vicente Sánchez <
>> langel.gro...@gmail.com>:
>>
>>> I have a standalone spark cluster and from within the same scala
>>> application I'm creating 2 different spark context to run two different
>>> spark streaming jobs as SparkConf is different for each of them.
>>>
>>> I'm getting this error that... I don't really understand:
>>>
>>> 14/09/16 11:51:35 ERROR OneForOneStrategy: spark.httpBroadcast.uri
>>>> java.util.NoSuchElementException: spark.httpBroadcast.uri
>>>>at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>>>at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>>>at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>>>at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>>>at org.apache.spark.SparkConf.get(SparkConf.scala:149)
>>>>at 
>>>> org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
>>>>at 
>>>> org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
>>>>at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
>>>>at org.apache.spark.executor.Executor.(Executor.scala:85)
>>>>at 
>>>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:59)
>>>>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>at 
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>at 
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>at 
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>at 
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> 14/09/16 11:51:35 ERROR CoarseGrainedExecutorBackend: Slave registration 
>>>> failed: Duplicate executor ID: 10
>>>>
>>>>
>>>  Both apps has different names so I don't get how the executor ID is not
>>> unique :-S
>>>
>>> This happens on startup and most of the time only one job dies, but
>>> sometimes both of them die.
>>>
>>> Regards,
>>>
>>> Luis
>>>
>>
>>
>


Re: Spark Streaming: CoarseGrainedExecutorBackend: Slave registration failed: Duplicate executor ID

2014-09-16 Thread Luis Ángel Vicente Sánchez
When I said scheduler I meant executor backend.

2014-09-16 13:26 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> It seems that, as I have a single scala application, the scheduler is the
> same and there is a collision between executors of both spark context. Is
> there a way to change how the executor ID is generated (maybe an uuid
> instead of a sequential number..?)
>
> 2014-09-16 13:07 GMT+01:00 Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com>:
>
>> I have a standalone spark cluster and from within the same scala
>> application I'm creating 2 different spark context to run two different
>> spark streaming jobs as SparkConf is different for each of them.
>>
>> I'm getting this error that... I don't really understand:
>>
>> 14/09/16 11:51:35 ERROR OneForOneStrategy: spark.httpBroadcast.uri
>>> java.util.NoSuchElementException: spark.httpBroadcast.uri
>>> at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>> at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>> at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>> at org.apache.spark.SparkConf.get(SparkConf.scala:149)
>>> at 
>>> org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
>>> at 
>>> org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
>>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
>>> at org.apache.spark.executor.Executor.(Executor.scala:85)
>>> at 
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:59)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 14/09/16 11:51:35 ERROR CoarseGrainedExecutorBackend: Slave registration 
>>> failed: Duplicate executor ID: 10
>>>
>>>
>>  Both apps has different names so I don't get how the executor ID is not
>> unique :-S
>>
>> This happens on startup and most of the time only one job dies, but
>> sometimes both of them die.
>>
>> Regards,
>>
>> Luis
>>
>
>


Re: Spark Streaming: CoarseGrainedExecutorBackend: Slave registration failed: Duplicate executor ID

2014-09-16 Thread Luis Ángel Vicente Sánchez
It seems that, as I have a single scala application, the scheduler is the
same and there is a collision between executors of both spark context. Is
there a way to change how the executor ID is generated (maybe an uuid
instead of a sequential number..?)

2014-09-16 13:07 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I have a standalone spark cluster and from within the same scala
> application I'm creating 2 different spark context to run two different
> spark streaming jobs as SparkConf is different for each of them.
>
> I'm getting this error that... I don't really understand:
>
> 14/09/16 11:51:35 ERROR OneForOneStrategy: spark.httpBroadcast.uri
>> java.util.NoSuchElementException: spark.httpBroadcast.uri
>>  at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>  at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>>  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>  at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>>  at org.apache.spark.SparkConf.get(SparkConf.scala:149)
>>  at 
>> org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
>>  at 
>> org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
>>  at 
>> org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
>>  at 
>> org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
>>  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
>>  at org.apache.spark.executor.Executor.(Executor.scala:85)
>>  at 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:59)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>  at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 14/09/16 11:51:35 ERROR CoarseGrainedExecutorBackend: Slave registration 
>> failed: Duplicate executor ID: 10
>>
>>
>  Both apps has different names so I don't get how the executor ID is not
> unique :-S
>
> This happens on startup and most of the time only one job dies, but
> sometimes both of them die.
>
> Regards,
>
> Luis
>


Spark Streaming: CoarseGrainedExecutorBackend: Slave registration failed: Duplicate executor ID

2014-09-16 Thread Luis Ángel Vicente Sánchez
I have a standalone spark cluster and from within the same scala
application I'm creating 2 different spark context to run two different
spark streaming jobs as SparkConf is different for each of them.

I'm getting this error that... I don't really understand:

14/09/16 11:51:35 ERROR OneForOneStrategy: spark.httpBroadcast.uri
> java.util.NoSuchElementException: spark.httpBroadcast.uri
>   at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>   at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
>   at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>   at scala.collection.AbstractMap.getOrElse(Map.scala:58)
>   at org.apache.spark.SparkConf.get(SparkConf.scala:149)
>   at 
> org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
>   at 
> org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
>   at 
> org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
>   at 
> org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
>   at org.apache.spark.executor.Executor.(Executor.scala:85)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:59)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/09/16 11:51:35 ERROR CoarseGrainedExecutorBackend: Slave registration 
> failed: Duplicate executor ID: 10
>
>
 Both apps has different names so I don't get how the executor ID is not
unique :-S

This happens on startup and most of the time only one job dies, but
sometimes both of them die.

Regards,

Luis


Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Luis Ángel Vicente Sánchez
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai :

>  Hi Luis,
>
>
>
> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
> used to remove useless timeout streaming data, the difference is that
> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
> input data, but also Spark’s useless metadata; while
> “spark.streaming.unpersist” is reference-based cleaning mechanism,
> streaming data will be removed when out of slide duration.
>
>
>
> Both these two parameter can alleviate the memory occupation of Spark
> Streaming. But if the data is flooded into Spark Streaming when start up
> like your situation using Kafka, these two parameters cannot well mitigate
> the problem. Actually you need to control the input data rate to not inject
> so fast, you can try “spark.straming.receiver.maxRate” to control the
> inject rate.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
> *Sent:* Wednesday, September 10, 2014 5:21 AM
> *To:* user@spark.apache.org
> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>
>
>
> The executors of my spark streaming application are being killed due to
> memory issues. The memory consumption is quite high on startup because is
> the first run and there are quite a few events on the kafka queues that are
> consumed at a rate of 100K events per sec.
>
> I wonder if it's recommended to use spark.cleaner.ttl and
> spark.streaming.unpersist together to mitigate that problem. And I also
> wonder if new RDD are being batched while a RDD is being processed.
>
> Regards,
>
> Luis
>


spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Luis Ángel Vicente Sánchez
The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis


Re: Spark streaming: size of DStream

2014-09-09 Thread Luis Ángel Vicente Sánchez
If you take into account what streaming means in spark, your goal doesn't
really make sense; you have to assume that your streams are infinite and
you will have to process them till the end of the days. Operations on a
DStream define what you want to do with each element of each RDD, but spark
streaming is smart enough to not apply the transformations if RDD are empty.

The only time where you probably want to know the size of the RDD is when
you are going to perform a side-effect like storing something in a
database, using foreachRDD, i.e:

val flumeStream = ...

val transformedStream = flumeStream.map(... some transformation
...).flatMap(... some other transformation).distinct().

transformedStream.foreachRDD { rdd =>
  if (rdd.count() != 0) {
// perform some side effect that shouldn't be done if a transformed
batch is empty
  }
}

2014-09-09 9:20 GMT+01:00 julyfire :

> i'm sorry I have some error in my code, update here:
>
> var count = -1L // a global variable in the main object
>
> val currentBatch = some_DStream
> val countDStream = currentBatch.map(o=>{
>   count = 0L  // reset the count variable in each batch
>   o
> })
> countDStream.foreachRDD(rdd=> count += rdd.count())
>
> if (count > 0) {
>   currentBatch.map(...).someOtherTransformation
> }
>
> two problems:
> 1. the variable count just go on accumulate and no reset in each batch
> 2. if(count > 0) only evaluate in the beginning of running the program, so
> the next statement will never run
>
> Can you all give me some suggestion? thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: using multiple dstreams together (spark streaming)

2014-07-16 Thread Luis Ángel Vicente Sánchez
hum... maybe consuming all streams at the same time with an actor that
would act as a new DStream source... but this is just a random idea... I
don't really know if that would be a good idea or even possible.


2014-07-16 18:30 GMT+01:00 Walrus theCat :

> Yeah -- I tried the .union operation and it didn't work for that reason.
> Surely there has to be a way to do this, as I imagine this is a commonly
> desired goal in streaming applications?
>
>
> On Wed, Jul 16, 2014 at 10:10 AM, Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com> wrote:
>
>> I'm joining several kafka dstreams using the join operation but you have
>> the limitation that the duration of the batch has to be same,i.e. 1 second
>> window for all dstreams... so it would not work for you.
>>
>>
>> 2014-07-16 18:08 GMT+01:00 Walrus theCat :
>>
>> Hi,
>>>
>>> My application has multiple dstreams on the same inputstream:
>>>
>>> dstream1 // 1 second window
>>> dstream2 // 2 second window
>>> dstream3 // 5 minute window
>>>
>>>
>>> I want to write logic that deals with all three windows (e.g. when the 1
>>> second window differs from the 2 second window by some delta ...)
>>>
>>> I've found some examples online (there's not much out there!), and I can
>>> only see people transforming a single dstream.  In conventional spark, we'd
>>> do this sort of thing with a cartesian on RDDs.
>>>
>>> How can I deal with multiple Dstreams at once?
>>>
>>> Thanks
>>>
>>
>>
>


Re: using multiple dstreams together (spark streaming)

2014-07-16 Thread Luis Ángel Vicente Sánchez
I'm joining several kafka dstreams using the join operation but you have
the limitation that the duration of the batch has to be same,i.e. 1 second
window for all dstreams... so it would not work for you.


2014-07-16 18:08 GMT+01:00 Walrus theCat :

> Hi,
>
> My application has multiple dstreams on the same inputstream:
>
> dstream1 // 1 second window
> dstream2 // 2 second window
> dstream3 // 5 minute window
>
>
> I want to write logic that deals with all three windows (e.g. when the 1
> second window differs from the 2 second window by some delta ...)
>
> I've found some examples online (there's not much out there!), and I can
> only see people transforming a single dstream.  In conventional spark, we'd
> do this sort of thing with a cartesian on RDDs.
>
> How can I deal with multiple Dstreams at once?
>
> Thanks
>


Re: Cassandra driver Spark question

2014-07-09 Thread Luis Ángel Vicente Sánchez
Yes, I'm using it to count concurrent users from a kafka stream of events
without problems. I'm currently testing it using the local mode but any
serialization problem would have already appeared so I don't expect any
serialization issue when I deployed to my cluster.


2014-07-09 15:39 GMT+01:00 RodrigoB :

> Hi Luis,
>
> Yes it's actually an ouput of the previous RDD.
> Have you ever used the Cassandra Spark Driver on the driver app? I believe
> these limitations go around that - it's designed to save RDDs from the
> nodes.
>
> tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9187.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Cassandra driver Spark question

2014-07-09 Thread Luis Ángel Vicente Sánchez
Is MyType serializable? Everything inside the foreachRDD closure has to be
serializable.


2014-07-09 14:24 GMT+01:00 RodrigoB :

> Hi all,
>
> I am currently trying to save to Cassandra after some Spark Streaming
> computation.
>
> I call a myDStream.foreachRDD so that I can collect each RDD in the driver
> app runtime and inside I do something like this:
> myDStream.foreachRDD(rdd => {
>
> var someCol = Seq[MyType]()
>
> foreach(kv =>{
>   someCol :+ rdd._2 //I only want the RDD value and not the key
>  }
> val collectionRDD = sc.parallelize(someCol) //THIS IS WHY IT FAILS TRYING
> TO
> RUN THE WORKER
> collectionRDD.saveToCassandra(...)
> }
>
> I get the NotSerializableException while trying to run the Node (also tried
> someCol as shared variable).
> I believe this happens because the myDStream doesn't exist yet when the
> code
> is pushed to the Node so the parallelize doens't have any structure to
> relate to it. Inside this foreachRDD I should only do RDD calls which are
> only related to other RDDs. I guess this was just a desperate attempt
>
> So I have a question
> Using the Cassandra Spark driver - Can we only write to Cassandra from an
> RDD? In my case I only want to write once all the computation is finished
> in
> a single batch on the driver app.
>
> tnks in advance.
>
> Rod
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Possible bug in Spark Streaming :: TextFileStream

2014-07-07 Thread Luis Ángel Vicente Sánchez
I have a basic spark streaming job that is watching a folder, processing
any new file and updating a column family in cassandra using the new
cassandra-spark-driver.

I think there is a problem with SparkStreamingContext.textFileStream... if
I start my job in local mode with no files in the folder that is watched
and then I copy a bunch of files, sometimes spark is continually processing
those files again and again.

I have noticed that it usually happens when spark doesn't detect all new
files in one go... i.e. I copied 6 files and spark detected 3 of them as
new and processed them; then it detected the other 3 as new and processed
them. After it finished to process all 6 files, it detected again the first
3 files as new files and processed them... then the other 3... and again...
and again... and again.

Should I rise a JIRA issue?

Regards,

Luis


Re: spark streaming rate limiting from kafka

2014-07-01 Thread Luis Ángel Vicente Sánchez
Maybe reducing the batch duration would help :\


2014-07-01 17:57 GMT+01:00 Chen Song :

> In my use case, if I need to stop spark streaming for a while, data would
> accumulate a lot on kafka topic-partitions. After I restart spark streaming
> job, the worker's heap will go out of memory on the fetch of the 1st batch.
>
> I am wondering if
>
> * Is there a way to throttle reading from kafka in spark streaming jobs?
> * Is there a way to control how far Kafka Dstream can read on
> topic-partition (via offset for example). By setting this to a small
> number, it will force DStream to read less data initially.
> * Is there a way to limit the consumption rate at Kafka side? (This one is
> not actually for spark streaming and doesn't seem to be question in this
> group. But I am raising it anyway here.)
>
> I have looked at code example below but doesn't seem it is supported.
>
> KafkaUtils.createStream ...
> Thanks, All
> --
> Chen Song
>
>


Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-17 Thread Luis Ángel Vicente Sánchez
I have been able to submit a job successfully but I had to config my spark
job this way:

  val sparkConf: SparkConf =
new SparkConf()
  .setAppName("TwitterPopularTags")
  .setMaster("spark://int-spark-master:7077")
  .setSparkHome("/opt/spark")
  .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))

Now I'm getting this error on my worker:

4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory



2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> After playing a bit, I have been able to create a fatjar this way:
>
> lazy val rootDependencies = Seq(
>   "org.apache.spark" %% "spark-core"  % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
> exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
> "spark-streaming_2.10")
> )
>
> Excluding those transitive dependencies, we can create a fatjar ~400Kb
> instead of 40Mb.
>
> My problem is not to run the streaming job locally but trying to submit it
> to standalone cluster using spark-submit, everytime I ran the following
> command, my workers died:
>
> ~/development/tools/spark/1.0.0/bin/spark-submit \
> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
> --master "spark://int-spark-master:7077" \
> --deploy-mode "cluster" \
> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>
> I have copied my fatjar to my master /tmp folder.
>
>
> 2014-06-17 10:30 GMT+01:00 Michael Cutler :
>
> Admittedly getting Spark Streaming / Kafka working for the first time can
>> be a bit tricky with the web of dependencies that get pulled in.  I've
>> taken the KafkaWorkCount example from the Spark project and set up a simple
>> standalone SBT project that shows you how to get it working and using
>> spark-submit.
>>
>> *https://github.com/cotdp/spark-example-kafka
>> <https://github.com/cotdp/spark-example-kafka>*
>>
>> The key trick is in the use of sbt-assembly instead of relying on any of
>> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
>> as provided, because they are part of the core spark-assembly already
>> running your cluster.  However "spark-streaming-kafka" is not, so you need
>> to package it in your 'fat JAR' while excluding all the mess that causes
>> the build to break.
>>
>> build.sbt
>> <https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:
>>
>> import AssemblyKeys._
>>
>> assemblySettings
>>
>> name := "spark-example-kafka"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>>
>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>
>>
>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>
>>
>> libraryDependencies ++= Seq(
>>
>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>
>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>
>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>
>> exclude("commons-beanutils", "commons-beanutils").
>>
>> exclude("commons-collections", "commons-collections").
>>
>> exclude("com.esotericsoftware.minlog", "minlog")
>>
>> )
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>
>>   {
>> case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>
>> case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>
>> case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>
>> case x => old(x)
>>
>>   }
>> }
>>
>>
>> You can see the "exclude()" has to go around the spark-streaming-kafka 
>> dependency,
>> and I've used a MergeStrategy to solve the "deduplicate: different file
>> contents found in the following" errors.
>>
>> Build t

Re: Worker dies while submitting a job

2014-06-17 Thread Luis Ángel Vicente Sánchez
I have been able to submit a job successfully but I had to config my spark
job this way:

  val sparkConf: SparkConf =
new SparkConf()
  .setAppName("TwitterPopularTags")
  .setMaster("spark://int-spark-master:7077")
  .setSparkHome("/opt/spark")
  .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))

Now I'm getting this error on my worker:

4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory




2014-06-17 17:36 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> Ok... I was checking the wrong version of that file yesterday. My worker
> is sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no
> case branch for that state and the worker is crashing. I still don't know
> why I'm getting a FAILED state but I'm sure that should kill the actor due
> to a scala.MatchError.
>
> Usually in scala is a best-practice to use a sealed trait and case
> classes/objects in a match statement instead of an enumeration (the
> compiler will complain about missing cases); I think that should be
> refactored to catch this kind of errors at compile time.
>
> Now I need to find why that state changed message is sent... I will
> continue updating this thread until I found the problem :D
>
>
> 2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com>:
>
> I'm playing with a modified version of the TwitterPopularTags example and
>> when I tried to submit the job to my cluster, workers keep dying with this
>> message:
>>
>> 14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
>> "/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
>> "-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
>> "org.apache.spark.deploy.worker.DriverWrapper"
>> "akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
>> "org.apache.spark.examples.streaming.TwitterPopularTags"
>> 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
>> scala.Enumeration$Val)
>> scala.MatchError: FAILED (of class scala.Enumeration$Val)
>> at
>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 14/06/16 17:11:17 INFO Worker: Starting Spark worker
>> int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
>> 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
>> 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
>> http://int-spark-app-ie005d6a3.mclabs.io:8081
>> 14/06/16 17:11:17 INFO Worker: Connecting to master
>> spark://int-spark-app-ie005d6a3.mclabs.io:7077...
>> 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
>> re-register worker at same address: akka.tcp://
>> sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676
>>
>> This happens when the worker receive a DriverStateChanged(driverId,
>> state, exception) message.
>>
>> To deploy the job I copied the jar file to the temporary folder of master
>> node and execute the following command:
>>
>> ./spark-submit \
>> --class org.apache.spark.examples.streaming.TwitterPopularTags \
>> --master spark://int-spark-master:7077 \
>> --deploy-mode cluster \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>>
>> I don't really know what the problem could be as there is a 'case _' that
>> should avoid that problem :S
>>
>
>


Re: Worker dies while submitting a job

2014-06-17 Thread Luis Ángel Vicente Sánchez
Ok... I was checking the wrong version of that file yesterday. My worker is
sending a DriverStateChanged(_, DriverState.FAILED, _) but there is no case
branch for that state and the worker is crashing. I still don't know why
I'm getting a FAILED state but I'm sure that should kill the actor due to a
scala.MatchError.

Usually in scala is a best-practice to use a sealed trait and case
classes/objects in a match statement instead of an enumeration (the
compiler will complain about missing cases); I think that should be
refactored to catch this kind of errors at compile time.

Now I need to find why that state changed message is sent... I will
continue updating this thread until I found the problem :D


2014-06-16 18:25 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:

> I'm playing with a modified version of the TwitterPopularTags example and
> when I tried to submit the job to my cluster, workers keep dying with this
> message:
>
> 14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
> "/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
> "-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
> "org.apache.spark.deploy.worker.DriverWrapper"
> "akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
> "org.apache.spark.examples.streaming.TwitterPopularTags"
> 14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
> scala.Enumeration$Val)
> scala.MatchError: FAILED (of class scala.Enumeration$Val)
> at
> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/06/16 17:11:17 INFO Worker: Starting Spark worker
> int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
> 14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
> 14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
> http://int-spark-app-ie005d6a3.mclabs.io:8081
> 14/06/16 17:11:17 INFO Worker: Connecting to master
> spark://int-spark-app-ie005d6a3.mclabs.io:7077...
> 14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
> re-register worker at same address: akka.tcp://
> sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676
>
> This happens when the worker receive a DriverStateChanged(driverId, state,
> exception) message.
>
> To deploy the job I copied the jar file to the temporary folder of master
> node and execute the following command:
>
> ./spark-submit \
> --class org.apache.spark.examples.streaming.TwitterPopularTags \
> --master spark://int-spark-master:7077 \
> --deploy-mode cluster \
> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>
> I don't really know what the problem could be as there is a 'case _' that
> should avoid that problem :S
>


Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-17 Thread Luis Ángel Vicente Sánchez
After playing a bit, I have been able to create a fatjar this way:

lazy val rootDependencies = Seq(
  "org.apache.spark" %% "spark-core"  % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
"spark-streaming_2.10")
)

Excluding those transitive dependencies, we can create a fatjar ~400Kb
instead of 40Mb.

My problem is not to run the streaming job locally but trying to submit it
to standalone cluster using spark-submit, everytime I ran the following
command, my workers died:

~/development/tools/spark/1.0.0/bin/spark-submit \
--class "org.apache.spark.examples.streaming.TwitterPopularTags" \
--master "spark://int-spark-master:7077" \
--deploy-mode "cluster" \
file:///tmp/spark-test-0.1-SNAPSHOT.jar

I have copied my fatjar to my master /tmp folder.


2014-06-17 10:30 GMT+01:00 Michael Cutler :

> Admittedly getting Spark Streaming / Kafka working for the first time can
> be a bit tricky with the web of dependencies that get pulled in.  I've
> taken the KafkaWorkCount example from the Spark project and set up a simple
> standalone SBT project that shows you how to get it working and using
> spark-submit.
>
> *https://github.com/cotdp/spark-example-kafka
> <https://github.com/cotdp/spark-example-kafka>*
>
> The key trick is in the use of sbt-assembly instead of relying on any of
> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
> as provided, because they are part of the core spark-assembly already
> running your cluster.  However "spark-streaming-kafka" is not, so you need
> to package it in your 'fat JAR' while excluding all the mess that causes
> the build to break.
>
> build.sbt
> <https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:
>
> import AssemblyKeys._
>
> assemblySettings
>
> name := "spark-example-kafka"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>
> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
> exclude("commons-beanutils", "commons-beanutils").
> exclude("commons-collections", "commons-collections").
> exclude("com.esotericsoftware.minlog", "minlog")
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
> case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
> case x if x.startsWith("plugin.properties") => MergeStrategy.last
> case x => old(x)
>   }
> }
>
>
> You can see the "exclude()" has to go around the spark-streaming-kafka 
> dependency,
> and I've used a MergeStrategy to solve the "deduplicate: different file
> contents found in the following" errors.
>
> Build the JAR with sbt assembly and use the scripts in bin/ to run the
> examples.
>
> I'm using this same approach to run my Spark Streaming jobs with
> spark-submit and have them managed using Mesos/Marathon
> <http://mesosphere.io/> to handle failures and restarts with long running
> processes.
>
> Good luck!
>
> MC
>
>
>
>
>
>  *Michael Cutler*
> Founder, CTO
>
>
> * Mobile: +44 789 990 7847 Email:   mich...@tumra.com 
> Web: tumra.com
> <http://tumra.com/?utm_source=signature&utm_medium=email> *
> *Visit us at our offices in Chiswick Park <http://goo.gl/maps/abBxq>*
> *Registered in England & Wales, 07916412. VAT No. 130595328*
>
>
> This email and any files transmitted with it are confidential and may also
> be privileged. It is intended only for the person to whom it is addressed.
> If you have received this email in error, please inform the sender 
> immediately.
> If you are not the intended recipient you must not use, disclose, copy,
> print, distribute or rely on this email.
>
>
> On 17 June 2014 02:51, Gino Bustelo

Re: spark streaming, kafka, SPARK_CLASSPATH

2014-06-16 Thread Luis Ángel Vicente Sánchez
Did you manage to make it work? I'm facing similar problems and this a
serious blocker issue. spark-submit seems kind of broken to me if you can
use it for spark-streaming.

Regards,

Luis


2014-06-11 1:48 GMT+01:00 lannyripple :

> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>
> I have a toy spark-streaming-kafka program.  It reads from a kafka queue
> and
> does
>
> stream
>   .map {case (k, v) => (v, 1)}
>   .reduceByKey(_ + _)
>   .print()
>
> using a 1 second interval on the stream.
>
> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
> 'compile' to sweep them into our assembly gives collisions on javax.*
> classes.  To work around this I modified
> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
> 'compile' in my project and picked up in the assembly.)
>
> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
> running spark-submit from my spark master.  I am guided by the information
> here https://spark.apache.org/docs/latest/submitting-applications.html
>
> Well at this point I was going to detail all the ways spark-submit fails to
> follow it's own documentation.  If I do not invoke sparkContext.setJars()
> then it just fails to find the driver class.  This is using various
> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
> and local: prefixes on the application-jar and --jars arguments.
>
> If I invoke sparkContext.setJars() and include my assembly jar I get
> further.  At this point I get a failure from
> kafka.consumer.ConsumerConnector not being found.  I suspect this is
> because
> spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
> too late in the classpath.
>
> At this point I try setting spark.files.userClassPathfirst to 'true' but
> this causes more things to blow up.
>
> I finally found something that works.  Namely setting environment variable
> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
> helpfully informed to
>
>   Please instead use:
>- ./spark-submit with --driver-class-path to augment the driver
> classpath
>- spark.executor.extraClassPath to augment the executor classpath
>
> which when put into a file and introduced with --properties-file does not
> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
> the kafka.consumer.ConsumerConnector error.
>
> At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
> jar in the classpath at SparkSubmit invocation
>
>   Spark Command: java -cp
>
> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
> /tmp/myjar.jar
>
> but using --properties-file then the assembly is not available for
> SparkSubmit.
>
> I think the root cause is either spark-submit not handling the
> spark-streaming libraries so they can be 'provided' or the inclusion of
> org.elicpse.jetty.orbit in the streaming libraries which cause
>
>   [error] (*:assembly) deduplicate: different file contents found in the
> following:
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>
> I've tried applying mergeStategy in assembly for my assembly.sbt but then I
> get
>
>   Invalid signature file digest for Manifest main attributes
>
> If anyone knows the magic to get this working a reply would be greatly
> appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Worker dies while submitting a job

2014-06-16 Thread Luis Ángel Vicente Sánchez
I'm playing with a modified version of the TwitterPopularTags example and
when I tried to submit the job to my cluster, workers keep dying with this
message:

14/06/16 17:11:16 INFO DriverRunner: Launch Command: "java" "-cp"
"/opt/spark-1.0.0-bin-hadoop1/work/driver-20140616171115-0014/spark-test-0.1-SNAPSHOT.jar:::/opt/spark-1.0.0-bin-hadoop1/conf:/opt/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar"
"-XX:MaxPermSize=128m" "-Xms512M" "-Xmx512M"
"org.apache.spark.deploy.worker.DriverWrapper"
"akka.tcp://sparkWorker@int-spark-worker:51676/user/Worker"
"org.apache.spark.examples.streaming.TwitterPopularTags"
14/06/16 17:11:17 ERROR OneForOneStrategy: FAILED (of class
scala.Enumeration$Val)
scala.MatchError: FAILED (of class scala.Enumeration$Val)
at
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:317)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/06/16 17:11:17 INFO Worker: Starting Spark worker
int-spark-app-ie005d6a3.mclabs.io:51676 with 2 cores, 6.5 GB RAM
14/06/16 17:11:17 INFO Worker: Spark home: /opt/spark-1.0.0-bin-hadoop1
14/06/16 17:11:17 INFO WorkerWebUI: Started WorkerWebUI at
http://int-spark-app-ie005d6a3.mclabs.io:8081
14/06/16 17:11:17 INFO Worker: Connecting to master
spark://int-spark-app-ie005d6a3.mclabs.io:7077...
14/06/16 17:11:17 ERROR Worker: Worker registration failed: Attempted to
re-register worker at same address: akka.tcp://
sparkwor...@int-spark-app-ie005d6a3.mclabs.io:51676

This happens when the worker receive a DriverStateChanged(driverId, state,
exception) message.

To deploy the job I copied the jar file to the temporary folder of master
node and execute the following command:

./spark-submit \
--class org.apache.spark.examples.streaming.TwitterPopularTags \
--master spark://int-spark-master:7077 \
--deploy-mode cluster \
file:///tmp/spark-test-0.1-SNAPSHOT.jar

I don't really know what the problem could be as there is a 'case _' that
should avoid that problem :S


Re: Anyone using value classes in RDDs?

2014-04-20 Thread Luis Ángel Vicente Sánchez
Type alias aren't safe as you could use any string as a name or id.
On 20 Apr 2014 14:18, "Surendranauth Hiraman" 
wrote:

> If the purpose is only aliasing, rather than adding additional methods and
> avoiding runtime allocation, what about type aliases?
>
> type ID = String
> type Name = String
>
>
> On Sat, Apr 19, 2014 at 9:26 PM, kamatsuoka  wrote:
>
>> No, you can wrap other types in value classes as well.  You can try it in
>> the REPL:
>>
>> scala> case class ID(val id: String) extends AnyVal
>> defined class ID
>>  scala> val i = ID("foo")
>> i: ID = ID(foo)
>>
>>
>> On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User
>> List] <[hidden email] 
>> > wrote:
>>
>>> isn't valueclasses for primitives (AnyVal) only? that doesn't apply to
>>> string, which is an object (AnyRef)
>>>
>>>
>>> On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka <[hidden 
>>> email]
>>> > wrote:
>>>
 I'm wondering if anyone has tried using value classes in RDDs?  My use
 case
 is that I have a number of RDDs containing strings, e.g.

 val r1: RDD[(String, (String, Int)] = ...
 val r2: RDD[(String, (String, Int)] = ...

 and it might be clearer if I wrote

 case class ID(val id: String) extends AnyVal
 case class Name(val id: String) extends AnyVal
 val r1: RDD[(ID, (Name, Int)] = ...
 val r2: RDD[(Name, (ID, Int)] = ...

 This seems like a pretty typical use case for value classes, but I
 haven't
 noticed anyone talking about it.  Although, I think you'd have to read
 through all of the Spark code paths to know whether allocation is
 required
 (http://docs.scala-lang.org/overviews/core/value-classes.html), so some
 comparative performance testing would be called for.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

>>>
>>>
>>>
>>> --
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html
>>>  To unsubscribe from Anyone using value classes in RDDs?, click here.
>>> NAML
>>>
>>
>>
>>
>> --
>> Kenji
>>
>> --
>> View this message in context: Re: Anyone using value classes in 
>> RDDs?
>> Sent from the Apache Spark User List mailing list 
>> archiveat Nabble.com.
>>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>