RE: ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi
The gist of it is this:
I have data indexed into ES. Each index stores monthly data and the query will 
get data for some date range (across several ES indexes or within 1 if the date 
range is within 1 month). Then I merge these RDDs into an uberRdd and performs 
some operations then print the result with for each.

The simplified code is below.

{
// esRdds: List[RDD] contains mentions count per post
val esRdds = (startDate.getYear to endDate.getYear).flatMap { year =
  val sMonth = if (year == startDate.getYear) startDate.getMonthOfYear else 
1
  val eMonth = if (year == endDate.getYear) endDate.getMonthOfYear else 12
  (sMonth to eMonth).map { i =
sc.esRDD(s$year-${i.formatted(%02d)}_nlpindex/nlp, 
ESQueries.generateQueryString(Some(startDate), Some(endDate), mentionsToFind, 
siteNames))
  .map { case (str, map) = unwrapAndCountMentionsPerPost(map)}
  }
}

var uberRdd = esRdds(0)
for (rdd - esRdds) {
  uberRdd = uberRdd ++ rdd
}

   uberRdd.map joinforeach(x = println(x))
  }

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: April 22, 2015 2:52 PM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: ElasticSearch for Spark times out

will you be able to paste the code?

On 23 April 2015 at 00:19, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read

ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:122) 
~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75]
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]
at 
org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
 ~[commons-httpclient-3.1.jar:na]
at java.io.FilterInputStream.read(FilterInputStream.java:133) 
~[na:1.7.0_75]
at 
org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
 ~[commons-httpclient-3.1.jar:na]
at 
org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) 
~[jackson-core-asl-1.9.11.jar:1.9.11]
at 
org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]
at 
org.codehaus.jackson.impl.Utf8StreamParser.slowParseFieldName(Utf8StreamParser.java:1404)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]
at 

RE: ElasticSearch for Spark times out

2015-04-22 Thread Adrian Mocanu
Hi

Thanks for the help. My ES is up.
Out of curiosity, do you know what the timeout value is? There are probably 
other things happening to cause the timeout; I don't think my ES is that slow 
but it's possible that ES is taking too long to find the data. What I see 
happening is that it uses scroll to get the data from ES; about 150 items at a 
time. Usual delay when I perform the same query from a browser plugin ranges 
from 1-5sec.

Thanks

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: April 22, 2015 3:09 PM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: ElasticSearch for Spark times out

Basically ready timeout means hat no data arrived within the specified receive 
timeout period.
Few thing I would suggest
1.are your ES cluster Up and running?
2. if 1 is yes then reduce the size of the Index make it few kbps and then test?

On 23 April 2015 at 00:19, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Hi

I use the ElasticSearch package for Spark and very often it times out reading 
data from ES into an RDD.
How can I keep the connection alive (why doesn't it? Bug?)

Here's the exception I get:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
~[scala-library.jar:na]
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
~[scala-library.jar:na]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
at java.net.SocketInputStream.read(SocketInputStream.java:122) 
~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75]
at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75]
at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na

EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out

2015-03-26 Thread Adrian Mocanu
Hi
I need help fixing a time out exception thrown from ElasticSearch Hadoop. The 
ES cluster is up all the time.
I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection 
of these RDD which I traverse (with foreachRDD) and create more RDDs from each 
one RDD in the collection. The resulting RDDs I put in a Queue from which I 
create a DStream.
After about 10 minutes of running, the program's debug output hangs for a bit 
then throws:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

This is the output:
 [data from elastic search like the next line (in green)]
13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content -  
toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota 
aa,toyota stout,toyota camry,toyota vista,toyota,toyota 
classic,toyota sprinter,toyota crown,toyota tundra,toyota 
prius,toyota aa,toyota stout,toyota camry,toyota 
vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota 
tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota 
vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota 
tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota 
vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}}
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close 
connection policy
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpConnection - Releasing connection back to connection 
manager.
13:55:26.631 [Executor task launch worker-0] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
  

Spark log shows only this line repeated: RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time X

2015-03-26 Thread Adrian Mocanu
Here's my log output from a streaming job.
What is this?


09:54:27.504 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067504
09:54:27.505 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067505
09:54:27.506 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067506
09:54:27.508 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067507
09:54:27.508 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067508
09:54:27.509 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067509
09:54:27.510 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067510
09:54:27.511 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067511
09:54:27.512 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067512
09:54:27.513 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067513
09:54:27.514 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067514
09:54:27.515 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067515
09:54:27.516 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067516
09:54:27.517 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067517
09:54:27.518 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067518
09:54:27.519 [RecurringTimer - JobGenerator] DEBUG 
o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 
1427378067519
09:54:27.520 [Recurri ...


RE: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out

2015-03-26 Thread Adrian Mocanu
I also get stack overflow every now and then without having any recursive calls:

java.lang.StackOverflowError: null
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1479) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
~[na:1.7.0_75]
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
~[na:1.7.0_75]
at 
scala.collection.immutable.$colon$colon.writeObject(List.scala:379) 
~[scala-library.jar:na]
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) 
~[na:na]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.7.0_75]
this is a huge stack trace... but it keeps repeating

What could this be from?

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March 26, 2015 2:10 PM
To: u...@spark.incubator.apache.org; user@spark.apache.org
Subject: EsHadoopSerializationException: java.net.SocketTimeoutException: Read 
timed out

Hi
I need help fixing a time out exception thrown from ElasticSearch Hadoop. The 
ES cluster is up all the time.
I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection 
of these RDD which I traverse (with foreachRDD) and create more RDDs from each 
one RDD in the collection. The resulting RDDs I put in a Queue from which I 
create a DStream.
After about 10 minutes of running, the program's debug output hangs for a bit 
then throws:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

This is the output:
 [data from elastic search like the next line (in green)]
13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content -  
toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota 
aa,toyota stout,toyota camry,toyota vista,toyota,toyota 
classic,toyota sprinter,toyota crown,toyota tundra,toyota 
prius,toyota aa,toyota stout,toyota camry,toyota 
vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota 
tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota 
vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota 
tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota 
vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}}
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close 
connection policy
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpConnection - Releasing connection back to connection 
manager.
13:55:26.631 [Executor task launch worker-0] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
at 
org.elasticsearch.hadoop.rest.RestRepository.scroll

writing DStream RDDs to the same file

2015-03-25 Thread Adrian Mocanu
Hi
Is there a way to write all RDDs in a DStream to the same file?
I tried this and got an empty file. I think it's bc the file is not closed i.e. 
ESMinibatchFunctions.writer.close() executes before the stream is created.

Here's my code
  myStream.foreachRDD(rdd = {
rdd.foreach(x = {
  ESMinibatchFunctions.writer.append(rdd.collect()(0).toString()+ the 
data )})
//localRdd = localRdd.union(rdd)
//localArray = localArray ++ rdd.collect()
  } )

ESMinibatchFunctions.writer.close()

object ESMinibatchFunctions {
  val writer = new PrintWriter(c:/delme/exxx.txt)
}


updateStateByKey - Seq[V] order

2015-03-24 Thread Adrian Mocanu
Hi
Does updateStateByKey pass elements to updateFunc (in Seq[V]) in order in which 
they appear in the RDD?
My guess is no which means updateFunc needs to be commutative. Am I correct?
I've asked this question before but there were no takers.

Here's the scala docs for updateStateByKey

  /**
   * Return a new state DStream where the state for each key is updated by 
applying
   * the given function on the previous state of the key and the new values of 
each key.
   * Hash partitioning is used to generate the RDDs with Spark's default number 
of partitions.
   * @param updateFunc State update function. If `this` function returns None, 
then
   *   corresponding state key-value pair will be eliminated.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) = Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
  }


partitionBy not working w HashPartitioner

2015-03-16 Thread Adrian Mocanu
Here's my use case:

I read an array into an RDD and I use a hash partitioner to partition the RDD.
This is the array type: Array[(String, Iterable[(Long, Int)])]

topK:Array[(String, Iterable[(Long, Int)])] = ...
import org.apache.spark.HashPartitioner
val hashPartitioner=new HashPartitioner(10)
val 
resultRdd=sc.parallelize(topK).partitionBy(hashPartitioner).sortByKey().saveAsTextFile(fileName)

I also tried
val resultRdd=sc.parallelize(topK, 10).sortByKey().saveAsTextFile(fileName)

The results:
I do get 10 partitions. However, the first partition always contains data for 
the first 2 keys in the RDD, then each following partition contains data for 1 
key in the RDD (as expected), then the last file is empty since the first file 
contained 2 keys.

The Question:
How to make Spark write 1 file per key? Is this behaviour I'm currently seeing 
a bug?


-Adrian




how to print RDD by key into file with grouByKey

2015-03-13 Thread Adrian Mocanu
Hi
I have an RDD: RDD[(String, scala.Iterable[(Long, Int)])] which I want to print 
into a file, a file for each key string.
I tried to trigger a repartition of the RDD by doing group by on it. The 
grouping gives RDD[(String, scala.Iterable[Iterable[(Long, Int)]])] so  I 
flattened that:
  Rdd.groupByKey().mapValues(x=x.flatten)

However, when I print with saveAsTextFile I get only 2 files

I was under the impression that groupBy repartitions the data by key and 
saveAsTextFile make a file per partition.
What am I doing wrong here?


Thanks
Adrian


does updateStateByKey return Seq() ordered?

2015-02-10 Thread Adrian Mocanu
I was looking at updateStateByKey documentation,
It passes in a values Seq which contains values that have the same key.

I would like to know if there is any ordering to these values. My feeling is 
that there is no ordering, but maybe it does preserve RDD ordering.

Example: RDD[ (a,2), (a,3), (a,1)]
Can values be unordered like Seq ((a,3),(a,1),(a,2) ) ?

-Adrian



Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

2015-01-22 Thread Adrian Mocanu
Hi
I get this exception when I run a Spark test case on my local machine:

An exception or error caused a run to abort: 
org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/dstream/PairDStreamFunctions;
java.lang.NoSuchMethodError: 
org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/dstream/PairDStreamFunctions;

In my test case I have these Spark related imports imports:
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.TestSuiteBase
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

-Adrian



RE: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

2015-01-22 Thread Adrian Mocanu

I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My test 
case uses org.apache.spark.streaming.TestSuiteBase

val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided 
excludeAll( 
val sparkStreaming= org.apache.spark % spark-streaming_2.10 % 
1.1.0-SNAPSHOT % provided excludeAll(
val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA 
exclude(org.apache.cassandra, cassandra-all) 
exclude(org.apache.cassandra, cassandra-thrift)
val casAll = org.apache.cassandra % cassandra-all % 2.0.3 intransitive()
val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3 
intransitive()
val sparkStreamingFromKafka = org.apache.spark % spark-streaming-kafka_2.10 
% 0.9.1 excludeAll(


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: January-22-15 11:39 AM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: Exception: NoSuchMethodError: 
org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

NoSuchMethodError almost always means that you have compiled some code against 
one version of a library but are running against another. I wonder if you are 
including different versions of Spark in your project, or running against a 
cluster on an older version?

On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com 
wrote:
 Hi

 I get this exception when I run a Spark test case on my local machine:



 An exception or error caused a run to abort:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
 rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
 la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
 dstream/PairDStreamFunctions;

 java.lang.NoSuchMethodError:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
 rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
 la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
 dstream/PairDStreamFunctions;



 In my test case I have these Spark related imports imports:

 import org.apache.spark.streaming.StreamingContext._

 import org.apache.spark.streaming.TestSuiteBase

 import org.apache.spark.streaming.dstream.DStream

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions



 -Adrian


B CB  [  
X  ܚX KK[XZ[
 \ \ ][  X  ܚX P \ ˘\X K ܙ B  ܈Y][ۘ[  [X[  K[XZ[
 \ \ Z[ \ ˘\X K ܙ B B

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

2015-01-22 Thread Adrian Mocanu
I use spark 1.1.0-SNAPSHOT

val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided 
excludeAll(
-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: January-22-15 11:39 AM
To: Adrian Mocanu
Cc: u...@spark.incubator.apache.org
Subject: Re: Exception: NoSuchMethodError: 
org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

NoSuchMethodError almost always means that you have compiled some code against 
one version of a library but are running against another. I wonder if you are 
including different versions of Spark in your project, or running against a 
cluster on an older version?

On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com 
wrote:
 Hi

 I get this exception when I run a Spark test case on my local machine:



 An exception or error caused a run to abort:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
 rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
 la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
 dstream/PairDStreamFunctions;

 java.lang.NoSuchMethodError:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
 rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
 la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
 dstream/PairDStreamFunctions;



 In my test case I have these Spark related imports imports:

 import org.apache.spark.streaming.StreamingContext._

 import org.apache.spark.streaming.TestSuiteBase

 import org.apache.spark.streaming.dstream.DStream

 import 
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions



 -Adrian




reconciling best effort real time with delayed aggregation

2015-01-14 Thread Adrian Mocanu
Hi
I have a question regarding design trade offs and best practices. I'm working 
on a real time analytics system. For simplicity, I have data with timestamps 
(the key) and counts (the value). I use DStreams for the real time aspect. 
Tuples w the same timestamp can be across various RDDs and I just aggregate 
each RDD by timestamp and increment counters in Cassandra; this gives correct 
aggregation counts based on data timestamp.
At the same time as tuple aggregations as saved into Cassandra I also show the 
aggregations on a chart and also pass the data through some more complicated 
math formulas (they output DStreams) which involve using updateStateByKey. 
These other operations are similar to moving average in the way that if data 
comes late you'd have to recalculate all moving averages starting from the date 
of the last delayed tuple; such is the nature of these calculations. The 
calculations are not saved in db but recalculated every time data is loaded 
from db.

Now, in real time I do things on a best effort basis, but the database will 
always have correct aggregations (even if tuples come in late for some early 
timestamp Cassandra will easily increment a counter w amount from this late 
tuple).
In real time, when a tuple w same timestamp belongs to several RDDs, I don't 
aggregate by tuple timestamp (bc that would mean reapplying the math formulas 
from the timestamp of the last tuple and that is too much overhead) instead I 
aggregate by RDD time which is system time when the RDD is created. This is 
good enough for real time.

So now you can see that the db (the truth provider) is different from real time 
streaming results (best effort).

My questions:
1. From your experience, is this design I just described appropriate?
2. I'm curious how others have solved the problem of reconciling diferences in 
their real time processing w batch mode. I think I read on the mailig list 
(several months ago) that someone re does the aggregation step an hour after 
data is received (ie aggregation DStream job is always an hour behind so that 
way late tuples have time to propagate to the db)
3. In case the source of data fails and it is restarted later, my design will 
give duplicates unless the tuples from the database are deleted for the 
timestamps that the data I am re-streaming contains. Is there a better way to 
avoid duplicates if running the same job twice or part of a bigger job. 
(idempotency)


Thanks
-Adrian



RE: RowMatrix.multiply() ?

2015-01-09 Thread Adrian Mocanu
I’m resurrecting this thread because I’m interested in doing transpose on a 
RowMatrix.
There is this other thread too: 
http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html
Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is still 
in work at this time.
Is this the correct Jira issue for the transpose operation? ETA?

Thanks a lot!
-A

From: Reza Zadeh [mailto:r...@databricks.com]
Sent: October-15-14 1:48 PM
To: ll
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix.multiply() ?

Hi,
We are currently working on distributed matrix operations. Two RowMatrices 
cannot be currently multiplied together. Neither can be they be added. They 
functionality will be added soon.

You can of course achieve this yourself by using IndexedRowMatrix and doing one 
join per operation you requested.

Best,
Reza

On Wed, Oct 15, 2014 at 8:50 AM, ll 
duy.huynh@gmail.commailto:duy.huynh@gmail.com wrote:
hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter
and returns the result as a distributed RowMatrix.

how do you perform this series of multiplications if A, B, C, and D are all
RowMatrix?

((A x B) x C) x D)

thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



using RDD result in another TDD

2014-11-12 Thread Adrian Mocanu
Hi
I'd like to use the result of one RDD1 in another RDD2. Normally I would use 
something like a barrier so make the 2nd RDD wait till the computation of the 
1st RDD is done then include the result from RDD1 in the closure for RDD2.
Currently I create another RDD, RDD3, out of the result of RDD1 then do 
Cartesian product on RDD2 and RDD3. NB: This operation is slow and expands 
partitions from 270 to 1200

This is a simplified example but I think it should help:
What I want to do (pseudocode):
   val a:Int=RDD1.reduce(..)
   RDD2.map(x = x*a)

What I use right now (pseudocode):
  val a:Int=RDD1.reduce(..)
  RDD3=makeRDD(a)
   RDD2.cartesianProduct(RDD3)

How to structure this type of operation to not need the barrier to block 
computing RDD2 until RDD1 is done?

-Adrian



RE: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2

2014-11-12 Thread Adrian Mocanu
My understanding is that the reason you have an Option is so you could filter 
out tuples when None is returned. This way your state data won't grow forever.

-Original Message-
From: spr [mailto:s...@yarcdata.com] 
Sent: November-12-14 2:25 PM
To: u...@spark.incubator.apache.org
Subject: Re: overloaded method value updateStateByKey ... cannot be applied to 
... when Key is a Tuple2

After comparing with previous code, I got it work by making the return a Some 
instead of Tuple2.  Perhaps some day I will understand this.


spr wrote
 --code
 
 val updateDnsCount = (values: Seq[(Int, Time)], state: 
 Option[(Int,
 Time)]) = { 
   val currentCount = if (values.isEmpty) 0 else values.map( x = 
 x._1).sum
   val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else 
 values.map( x = x._2).min
 
   val (previousCount, minTime) = state.getOrElse((0,
 Time(System.currentTimeMillis)))
 
   //  (currentCount + previousCount, Seq(minTime, newMinTime).min)
 == old
   Some(currentCount + previousCount, Seq(minTime, newMinTime).min) 
 // == new
 }





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2

2014-11-12 Thread Adrian Mocanu
You are correct; the filtering I’m talking about is done implicitly. You don’t 
have to do it yourself. Spark will do it for you and remove those entries from 
the state collection.

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: November-12-14 3:50 PM
To: Adrian Mocanu
Cc: spr; u...@spark.incubator.apache.org
Subject: Re: overloaded method value updateStateByKey ... cannot be applied to 
... when Key is a Tuple2

Adrian, do you know if this is documented somewhere? I was also under the 
impression that setting a key's value to None would cause the key to be 
discarded (without any explicit filtering on the user's part) but can not find 
any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
My understanding is that the reason you have an Option is so you could filter 
out tuples when None is returned. This way your state data won't grow forever.

-Original Message-
From: spr [mailto:s...@yarcdata.commailto:s...@yarcdata.com]
Sent: November-12-14 2:25 PM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: overloaded method value updateStateByKey ... cannot be applied to 
... when Key is a Tuple2

After comparing with previous code, I got it work by making the return a Some 
instead of Tuple2.  Perhaps some day I will understand this.


spr wrote
 --code

 val updateDnsCount = (values: Seq[(Int, Time)], state:
 Option[(Int,
 Time)]) = {
   val currentCount = if (values.isEmpty) 0 else values.map( x =
 x._1).sum
   val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
 values.map( x = x._2).min

   val (previousCount, minTime) = state.getOrElse((0,
 Time(System.currentTimeMillis)))

   //  (currentCount + previousCount, Seq(minTime, newMinTime).min)
 == old
   Some(currentCount + previousCount, Seq(minTime, newMinTime).min)
 // == new
 }





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



[spark upgrade] Error communicating with MapOutputTracker when running test cases in latest spark

2014-09-10 Thread Adrian Mocanu
I use
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
to help me with testing.

In spark 9.1 my tests depending on TestSuiteBase worked fine. As soon as I 
switched to latest (1.0.1) all tests fail. My sbt import is: org.apache.spark 
%% spark-core % 1.1.0-SNAPSHOT % provided

One exception I get is:
Error communicating with MapOutputTracker
org.apache.spark.SparkException: Error communicating with MapOutputTracker 

How can I fix this?

Found a thread on this error but not very helpful: 
http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3ctencent_6b37d69c54f76819509a5...@qq.com%3E

-Adrian



RE: ElasticSearch enrich

2014-06-27 Thread Adrian Mocanu
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215,
 could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.b...@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the 
specific example is at 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then 
call  saveAsHadoopDataset on the RDD that gets passed into the function we 
provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =
 val jobconf = ...
 data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Thanks. I without local option I can connect with es remote, now I only have 
one problem. How can I use elasticsearch-hadoop with spark streaming? I mean 
DStream doesn't have saveAsHadoopFiles method, my second problem the output 
index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user 
the ESInputFormat and ESOutputFormat 
(https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics 
here: 
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just 
./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
That's okay, but hadoop has ES integration. what happened if I run 
saveAsHadoopFile without hadoop (or I must need to pull up hadoop 
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es 
connection, but in prodution I want to use ElasticClient.remote, to this I want 
to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of 
mapPartitions (since it isn't serializable) and if you want to use a different 
client in local mode just have a flag that control what type of client you 
create.
- my stream output is write into elasticsearch. How can I test 
output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data 
(EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just 
start single node elastic search cluster.

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
So I'm giving a talk at the Spark summit on using Spark  ElasticSearch, but 
for now if you want to see a simple demo which uses elasticsearch for geo input 
you can take a look at my quick  dirty implementation with 
TopTweetsInALocation ( 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of having 
to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query 
for each record in your RDD. If this is the case, you could instead look at 
using mapPartitions and setting up your Elasticsearch connection inside of 
that, so you could then re-use 

RE: How to turn off MetadataCleaner?

2014-05-23 Thread Adrian Mocanu
Hi TD,
I use 0.9.1. Thanks for letting me know. This issue drove me up the wall. I 
even made a method to close all that I could think of:

def stopSpark(ssc: StreamingContext) = {
ssc.sparkContext.cleanup(500)
ssc.sparkContext.clearFiles()
ssc.sparkContext.clearJars()
ssc.sparkContext.metadataCleaner.cancel()
ssc.awaitTermination(500)
ssc.stop(true)
  }

-A
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-22-14 9:50 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: How to turn off MetadataCleaner?

The cleaner should remain up while the sparkcontext is still active (not 
stopped). However, here it seems you are stopping the sparkContext 
(ssc.stop(true)), the cleaner should be stopped. However, there was a bug 
earlier where some of the cleaners may not have been stopped when the context 
is stopped. What version are you using. If it is 0.9.1, I can see that the 
cleaner in 
ShuffleBlockManagerhttps://github.com/apache/spark/blob/v0.9.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
 is not stopped, so it is a bug.

TD

On Thu, May 22, 2014 at 9:24 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Hi
After using sparks TestSuiteBase to run some tests I’ve noticed that at the 
end, after finishing all tests the cleaner is still running and outputs the 
following perdiodically:
INFO  o.apache.spark.util.MetadataCleaner  - Ran metadata cleaner for 
SHUFFLE_BLOCK_MANAGER

I use method testOperation and I’ve changed it so that it stores the pointer to 
ssc after running setupStreams. Then using that pointer to turn things off, but 
the cleaner remains up.

How to shut down all of spark, including cleaner?

Here is how I changed testOperation method (changes in bold):

  def testOperation[U: ClassTag, V: ClassTag](
   input: Seq[Seq[U]],
   operation: DStream[U] = 
DStream[V],
   expectedOutput: Seq[Seq[V]],
   numBatches: Int,
   useSet: Boolean
   ) {
val numBatches_ = if (numBatches  0) numBatches else expectedOutput.size
val ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
verifyOutput[V](output, expectedOutput, useSet)
ssc.awaitTermination(500)
ssc.stop(true)
  }

-Adrian




How to turn off MetadataCleaner?

2014-05-22 Thread Adrian Mocanu
Hi
After using sparks TestSuiteBase to run some tests I've noticed that at the 
end, after finishing all tests the cleaner is still running and outputs the 
following perdiodically:
INFO  o.apache.spark.util.MetadataCleaner  - Ran metadata cleaner for 
SHUFFLE_BLOCK_MANAGER

I use method testOperation and I've changed it so that it stores the pointer to 
ssc after running setupStreams. Then using that pointer to turn things off, but 
the cleaner remains up.

How to shut down all of spark, including cleaner?

Here is how I changed testOperation method (changes in bold):

  def testOperation[U: ClassTag, V: ClassTag](
   input: Seq[Seq[U]],
   operation: DStream[U] = 
DStream[V],
   expectedOutput: Seq[Seq[V]],
   numBatches: Int,
   useSet: Boolean
   ) {
val numBatches_ = if (numBatches  0) numBatches else expectedOutput.size
val ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
verifyOutput[V](output, expectedOutput, useSet)
ssc.awaitTermination(500)
ssc.stop(true)
  }

-Adrian



tests that run locally fail when run through bamboo

2014-05-21 Thread Adrian Mocanu
I have a few test cases for Spark which extend TestSuiteBase from 
org.apache.spark.streaming.
The tests run fine on my machine but when I commit to repo and run the tests 
automatically with bamboo the test cases fail with these errors.

How to fix?


21-May-2014 16:33:09

[info] StreamingZigZagSpec:

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream *** FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 
times (most recent failure: Exception failure: 
java.io.StreamCorruptedException: invalid type code: AC)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

21-May-2014 16:33:09

[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at scala.Option.foreach(Option.scala:236)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

21-May-2014 16:33:09

[info]   ...

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream with intermittent empty RDDs *** 
FAILED ***

21-May-2014 16:33:09

[info]   Operation timed out after 10042 ms (TestSuiteBase.scala:283)

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 
times (most recent failure: Exception failure: java.io.FileNotFoundException: 
/tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or 
directory))

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

21-May-2014 16:33:09

[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at scala.Option.foreach(Option.scala:236)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

21-May-2014 16:33:09

[info]   ...

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream w notification for each change  *** 
FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 141.0:0 failed 1 
times (most recent failure: Exception failure: java.io.FileNotFoundException: 
http://10.10.1.9:62793/broadcast_1)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

21-May-2014 16:33:09

[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 

RE: tests that run locally fail when run through bamboo

2014-05-21 Thread Adrian Mocanu
Just found this at the top of the log:

17:14:41.124 [pool-7-thread-3-ScalaTest-running-StreamingSpikeSpec] WARN  
o.e.j.u.component.AbstractLifeCycle - FAILED 
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in 
use
build   21-May-2014 17:14:41   java.net.BindException: Address already in use

Is there a way to set these connection up so that they don't all start on the 
same port (that's my guess for the root cause of the issue)

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: May-21-14 4:58 PM
To: u...@spark.incubator.apache.org; user@spark.apache.org
Subject: tests that run locally fail when run through bamboo

I have a few test cases for Spark which extend TestSuiteBase from 
org.apache.spark.streaming.
The tests run fine on my machine but when I commit to repo and run the tests 
automatically with bamboo the test cases fail with these errors.

How to fix?


21-May-2014 16:33:09

[info] StreamingZigZagSpec:

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream *** FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 
times (most recent failure: Exception failure: 
java.io.StreamCorruptedException: invalid type code: AC)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

21-May-2014 16:33:09

[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at scala.Option.foreach(Option.scala:236)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

21-May-2014 16:33:09

[info]   ...

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream with intermittent empty RDDs *** 
FAILED ***

21-May-2014 16:33:09

[info]   Operation timed out after 10042 ms (TestSuiteBase.scala:283)

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 
times (most recent failure: Exception failure: java.io.FileNotFoundException: 
/tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or 
directory))

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

21-May-2014 16:33:09

[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at scala.Option.foreach(Option.scala:236)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

21-May-2014 16:33:09

[info]   ...

21-May-2014 16:33:09

[info] - compute zigzag indicator in stream w notification for each change  *** 
FAILED ***

21-May-2014 16:33:09

[info]   org.apache.spark.SparkException: Job aborted: Task 141.0:0 failed 1 
times (most recent failure: Exception failure: java.io.FileNotFoundException: 
http://10.10.1.9:62793/broadcast_1)

21-May-2014 16:33:09

[info]   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply

RE: slf4j and log4j loop

2014-05-16 Thread Adrian Mocanu
Please ignore. This was sent last week not sure why it arrived so late.

-Original Message-
From: amoc [mailto:amoc...@verticalscope.com] 
Sent: May-09-14 10:13 AM
To: u...@spark.incubator.apache.org
Subject: Re: slf4j and log4j loop

Hi Patrick/Sean,
Sorry to resurrect this thread, but after upgrading to Spark 9.1 I still get 
this error on runtime. ..trying to run some tests here. 
Has this actually been integrated int Spark 9.1?

Thanks again
-A



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/slf4j-and-log4j-loop-tp2699p5524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


same log4j slf4j error in spark 9.1

2014-05-15 Thread Adrian Mocanu
I recall someone from the Spark team (TD?) saying that Spark 9.1 will change 
the logger and the circular loop error between slf4j and log4j wouldn't show up.

Yet on Spark 9.1 I still get
SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class 
path, preempting StackOverflowError.
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more 
details.

Any solutions?

-Adrian


missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Adrian Mocanu
Hey guys,
I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j 
dependency and was told that it was gone. However I still find it part of 
zookeeper imports. This is fine since I exclude it myself in the sbt file, but 
another issue arises.
I wonder if anyone else has run into this.

Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2
I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5

I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 
1.2.17 because I get missing method error:
java.lang.NoSuchMethodError: 
org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V
at 
org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
at 
org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
at org.apache.spark.SparkContext.init(SparkContext.scala:139)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)
at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)
...

Is there a way to find out what versions of slf4j I need to make it work with 
log4j 1.2.17?

-Adrian



RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]
When the update function is called the first time Seq[V] has data: 1, 2 which 
is correct: StateClass(3,2, ArrayBuffer(1, 2))
Then right away (in my output I see this) the same key is used and the function 
is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

In the update function I also save Seq[V] to state so I can see it in the RDD. 
I also

RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
Forgot to mention my batch interval is 1 second:
val ssc = new StreamingContext(conf, Seconds(1))
hence the Thread.sleep(1100)

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: May-05-14 12:06 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: another updateStateByKey question - updated w possible Spark bug

I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   -empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  -empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  -empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) = {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue - values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues - values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=rdd.map(t=(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key

RE: What is Seq[V] in updateStateByKey?

2014-05-01 Thread Adrian Mocanu
So Seq[V] contains only new tuples. I initially thought that whenever a new 
tuple was found, it would add it to Seq and call the update function 
immediately so there wouldn't be more than 1 update to Seq per function call.

Say I want to sum tuples with the same key is an RDD using updateStateByKey, 
Then (1) Seq[V] would contain the numbers for a particular key and my S state 
could be the sum? 
Or would (2) Seq contain partial sums (say sum per partition?) which I then 
need to sum into the final sum?

After writing this out and thinking a little more about it I think #2 is 
correct. Can you confirm?

Thanks again!
-A

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: April-30-14 4:30 PM
To: user@spark.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

S is the previous count, if any. Seq[V] are potentially many new counts. All of 
them have to be added together to keep an accurate total.  It's as if the count 
were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- 
the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same line of 
code. Why:

  val currentCount = values.foldLeft(0)(_ + _)

instead of

  val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely quicker. 
Same with things like filter(_ == 200).size instead of count(_ == 200)... 
pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com 
wrote:
 Hi TD,

 Why does the example keep recalculating the count via fold?

 Wouldn’t it make more sense to get the last count in values Seq and 
 add 1 to it and save that as current count?



 From what Sean explained I understand that all values in Seq have the 
 same key. Then when a new value for that key is found it is added to 
 this Seq collection and the update function is called.



 Is my understanding correct?


updateStateByKey example not using correct input data?

2014-05-01 Thread Adrian Mocanu
I'm trying to understand updateStateByKey.

Here's an example I'm testing with:
Input data: DStream( RDD( (a,2) ), RDD( (a,3) ), RDD( (a,4) ), RDD( 
(a,5) ), RDD( (a,6) ), RDD( (a,7) ) )

Code:
  val updateFunc = (values: Seq[Int], state: Option[StateClass]) = {
  val previousState = state.getOrElse( StateClass(0,0, Seq()) )
  val currentSum = values.sum + previousState.sum
  val currentCount = values.size + previousState.count

if (currentCount==previousState.count) {
  None//if this RDD has no change then remove the tuple
} else {
  Some( StateClass(currentSum, currentCount, values) )
}
}

intStream.updateStateByKey[StateClass](updateFunc).transform(rdd=rdd.map(t=(t,rdd.id))).print()

Results:
((a,StateClass(14,5,ArrayBuffer(2.0, 3.0, 3.0, 3.0, 3.0))),12)
((a,StateClass(17,6,ArrayBuffer(3.0))),22)
((a,StateClass(20,7,ArrayBuffer(3.0))),32)

Questions:
Why does RDD with ID=12 have these elements: (2.0, 3.0, 3.0, 3.0, 3.0) ?
These do not exist in input data so where do these numbers come from? ..well 2 
and 3 exists but not the other 3's and it's missing 4,5,6,7 also.
What is going on here?

-Adrian



range partitioner with updateStateByKey

2014-05-01 Thread Adrian Mocanu
If I use a range partitioner, will this make updateStateByKey take the tuples 
in order?
Right now I see them not being taken in order (most of them are ordered but not 
all)

-Adrian



What is Seq[V] in updateStateByKey?

2014-04-29 Thread Adrian Mocanu
What is Seq[V] in updateStateByKey?
Does this store the collected tuples of the RDD in a collection?

Method signature:
def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = 
Option[S] ): DStream[(K, S)]

In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the 
moment I switched to a different type like Seq[(String, Double)] the code 
didn't compile.

-Adrian



FW: reduceByKeyAndWindow - spark internals

2014-04-25 Thread Adrian Mocanu
Any suggestions where I can find this in the documentation or elsewhere?

Thanks

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: April-24-14 11:26 AM
To: u...@spark.incubator.apache.org
Subject: reduceByKeyAndWindow - spark internals

If I have this code:
val stream1= doublesInputStream.window(Seconds(10), Seconds(2))
val stream2= stream1.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(10))

Does reduceByKeyAndWindow merge all RDDs from stream1 that came in the 10 
second window?

Example, in the first 10 secs stream1 will have 5 RDDS. Does 
reduceByKeyAndWindow merge these 5RDDs into 1 RDD and remove duplicates?

-Adrian



reduceByKeyAndWindow - spark internals

2014-04-24 Thread Adrian Mocanu
If I have this code:
val stream1= doublesInputStream.window(Seconds(10), Seconds(2))
val stream2= stream1.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(10))

Does reduceByKeyAndWindow merge all RDDs from stream1 that came in the 10 
second window?

Example, in the first 10 secs stream1 will have 5 RDDS. Does 
reduceByKeyAndWindow merge these 5RDDs into 1 RDD and remove duplicates?

-Adrian



writing booleans w Calliope

2014-04-17 Thread Adrian Mocanu
Has anyone managed to write Booleans to Cassandra from an RDD with Calliope?
My Booleans give compile time errors: expression of type List[Any] does not 
conform to expected type Types.CQLRowValues
CQLColumnValue is definted as ByteBuffer: type CQLColumnValue = ByteBuffer

For now I convert them to string. I tried converting them to bytes but that 
compiled but gave me a runtime error since scala byte is not compatible with 
Cassandra Boolean.

-Adrian



RE: function state lost when next RDD is processed

2014-03-28 Thread Adrian Mocanu
I'd like to resurrect this thread since I don't have an answer yet.

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March-27-14 10:04 AM
To: u...@spark.incubator.apache.org
Subject: function state lost when next RDD is processed

Is there a way to pass a custom function to spark to run it on the entire 
stream? For example, say I have a function which sums up values in each RDD and 
then across RDDs.

I've tried with map, transform, reduce. They all apply my sum function on 1 
RDD. When the next RDD comes the function starts from 0 so the sum of the 
previous RDD is lost.

Does Spark support a way of passing a custom function so that its state is 
preserved across RDDs and not only within RDD?

Thanks
-Adrian



RE: function state lost when next RDD is processed

2014-03-28 Thread Adrian Mocanu
Thanks!

Ya that's what I'm doing so far, but I wanted to see if it's possible to keep 
the tuples inside Spark for fault tolerance purposes.

-A
From: Mark Hamstra [mailto:m...@clearstorydata.com]
Sent: March-28-14 10:45 AM
To: user@spark.apache.org
Subject: Re: function state lost when next RDD is processed

As long as the amount of state being passed is relatively small, it's probably 
easiest to send it back to the driver and to introduce it into RDD 
transformations as the zero value of a fold.

On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
I'd like to resurrect this thread since I don't have an answer yet.

From: Adrian Mocanu 
[mailto:amoc...@verticalscope.commailto:amoc...@verticalscope.com]
Sent: March-27-14 10:04 AM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: function state lost when next RDD is processed

Is there a way to pass a custom function to spark to run it on the entire 
stream? For example, say I have a function which sums up values in each RDD and 
then across RDDs.

I've tried with map, transform, reduce. They all apply my sum function on 1 
RDD. When the next RDD comes the function starts from 0 so the sum of the 
previous RDD is lost.

Does Spark support a way of passing a custom function so that its state is 
preserved across RDDs and not only within RDD?

Thanks
-Adrian




RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
I think you should sort each RDD

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 4:44 PM
To: u...@spark.incubator.apache.org
Subject: Re: Splitting RDD and Grouping together to perform computation

Hi,
Thanks Nanzhu.I tried to implement your suggestion on following scenario.I have 
RDD of say 24 elements.In that when i partioned into two groups of 12 elements 
each.Their is loss of order of elements in partition.Elemest are partitioned 
randomly.I need to preserve the order such that the first 12 elements should be 
1st partition and 2nd 12 elemts should be in 2nd partition.
Guys please help me how to main order of original sequence even after 
partioningAny solution
Before Partition:RDD
64
29186
16059
9143
6439
6155
9187
18416
25565
30420
33952
38302
43712
47092
48803
52687
56286
57471
63429
70715
75995
81878
80974
71288
48556
After Partition:In group1 with 12 elements 64, 29186,
18416
30420
33952
38302
43712
47092
56286
81878
80974
71288
48556



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
I say you need to remap so you have a key for each tuple that you can sort on.
Then call rdd.sortByKey(true) like this mystream.transform(rdd = 
rdd.sortByKey(true))
For this fn to be available you need to import 
org.apache.spark.rdd.OrderedRDDFunctions

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 5:02 PM
To: u...@spark.incubator.apache.org
Subject: RE: Splitting RDD and Grouping together to perform computation


Hi,
Here is my code for given scenario.Could you please let me know where to sort?I 
mean on what basis we have to sort??so that they maintain order in partition as 
thatof original sequence..

val res2=reduced_hccg.map(_._2)// which gives RDD of numbers
res2.foreach(println)
val result= res2.mapPartitions(p={
   val l=p.toList
   
   val approx=new ListBuffer[(Int)]
   val detail=new ListBuffer[Double]
   for(i-0 until l.length-1 by 2)
   {
println(l(i),l(i+1))
approx+=(l(i),l(i+1))
   
 
   }
   approx.toList.iterator
   detail.toList.iterator
 })
result.foreach(println)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Splitting RDD and Grouping together to perform computation

2014-03-28 Thread Adrian Mocanu
Not sure how to change your code because you'd need to generate the keys where 
you get the data. Sorry about that.
I can tell you where to put the code to remap and sort though.

import org.apache.spark.rdd.OrderedRDDFunctions
val res2=reduced_hccg.map(_._2) 
.map( x= (newkey,x)).sortByKey(true)  //and if you want remap them to remove 
the key that you used for sorting: .map(x= x._2)

res2.foreach(println)
val result= res2.mapPartitions(p={
   val l=p.toList
   
   val approx=new ListBuffer[(Int)]
   val detail=new ListBuffer[Double]
   for(i-0 until l.length-1 by 2)
   {
println(l(i),l(i+1))
approx+=(l(i),l(i+1))
   
 
   }
   approx.toList.iterator
   detail.toList.iterator
 })
result.foreach(println)

-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: March-28-14 5:17 PM
To: u...@spark.incubator.apache.org
Subject: RE: Splitting RDD and Grouping together to perform computation

Hi Andriana,

Thanks for suggestion.Could you please modify my code part where I need to do 
so..I apologise for inconvinience ,becoz i am new to spark I coudnt apply 
appropriately..i would be thankful to you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


StreamingContext.transform on a DStream

2014-03-27 Thread Adrian Mocanu
Found this transform fn in StreamingContext which takes in a DStream[_] and a 
function which acts on each of its RDDs
Unfortunately I can't figure out how to transform my DStream[(String,Int)] into 
DStream[_]


/*** Create a new DStream in which each RDD is generated by applying a function 
on RDDs of the DStreams. */
  def transform[T: ClassTag](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) = RDD[T]
): DStream[T] = {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }

-Adrian



RE: StreamingContext.transform on a DStream

2014-03-27 Thread Adrian Mocanu
Please disregard I didn't see the Seq wrapper.

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March-27-14 11:57 AM
To: u...@spark.incubator.apache.org
Subject: StreamingContext.transform on a DStream

Found this transform fn in StreamingContext which takes in a DStream[_] and a 
function which acts on each of its RDDs
Unfortunately I can't figure out how to transform my DStream[(String,Int)] into 
DStream[_]


/*** Create a new DStream in which each RDD is generated by applying a function 
on RDDs of the DStreams. */
  def transform[T: ClassTag](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) = RDD[T]
): DStream[T] = {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }

-Adrian



how to create a DStream from bunch of RDDs

2014-03-27 Thread Adrian Mocanu
I create several RDDs by merging several consecutive RDDs from a DStream. Is 
there a way to add these new RDDs to a DStream?

-Adrian



closures moving averages (state)

2014-03-26 Thread Adrian Mocanu
I'm passing a moving average function during the map phase like this:
  val average= new Sma(window=3)
stream.map(x= average.addNumber(x))

where
  class Sma extends Serializable { .. }

I also tried to put creation of object average in an object like I saw in 
another post:
 object Average {
val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3)
 }
Every time  average.addNumber is called it is a new instance.
How can I preserve state of average object?

Thanks
-Adrian



RE: closures moving averages (state)

2014-03-26 Thread Adrian Mocanu
Tried with reduce and it's giving me pretty weird results that make no sense 
ie:  1  number for an entire RDD

  val smaStream= inputStream.reduce{ case(t1,t2) =
{
  val sma= average.addDataPoint(t1)
  sma
}}


Tried with transform and that worked correctly, but unfortunately, it works 1 
RDD at a time so the moving average is reset when the next consecutive RDD is 
read .. as if a new instance of the Average class is created for each RDD.

Is there a way to have 1 global variable of custom type (ie my case Average 
type) .. somewhat like accumulators, but not incrementable in parallel - it 
wouldn't make sense for a moving average.

The reason I want to apply a moving average function to a stream is so that  
the tuples remain in Spark and benefit from its fault tolerant mechanisms.

My guess is that currently this is not possible, but I'll wait for one of the 
Spark creators to comment on this.

-A

From: Benjamin Black [mailto:b...@b3k.us]
Sent: March-26-14 11:50 AM
To: user@spark.apache.org
Subject: Re: closures  moving averages (state)

Perhaps you want reduce rather than map?

On Wednesday, March 26, 2014, Adrian Mocanu 
amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote:
I'm passing a moving average function during the map phase like this:
  val average= new Sma(window=3)
stream.map(x= average.addNumber(x))

where
  class Sma extends Serializable { .. }

I also tried to put creation of object average in an object like I saw in 
another post:
 object Average {
val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3)
 }
Every time  average.addNumber is called it is a new instance.
How can I preserve state of average object?

Thanks
-Adrian



RE: streaming questions

2014-03-26 Thread Adrian Mocanu
Hi Diana,

I'll answer Q3:

You can check if an RDD is empty in several ways.
Someone here mentioned that using an iterator was safer:
 val isEmpty = rdd.mapPartitions(iter = Iterator(! iter.hasNext)).reduce(__)

You can also check with a fold or rdd.count
  rdd.reduce(_ + _)  // can't handle empty RDD
  rdd.fold(0)(_ + _)  // no problem with empty RDD

A
From: Diana Carroll [mailto:dcarr...@cloudera.com]
Sent: March-26-14 2:09 PM
To: user
Subject: streaming questions

I'm trying to understand Spark streaming, hoping someone can help.

I've kinda-sorta got a version of Word Count running, and it looks like this:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StreamingWordCount {

  def main(args: Array[String]) {
if (args.length  3) {
  System.err.println(Usage: StreamingWordCount master hostname port)
  System.exit(1)
}

val master = args(0)
val hostname = args(1)
val port = args(2).toInt

val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2))
val lines = ssc.socketTextStream(hostname, port)
val words = lines.flatMap(line = line.split( ))
val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
 }
}

(I also have a small script that sends text to that port.)

Question 1:
When I run this, I don't get any output from the wordCounts.print as long as my 
data is still streaming.  I have to stop my streaming data script before my 
program will display the word counts.

Why is that?  What if my stream is indefinite?  I thought the point of 
Streaming was that it would process it in real time?

Question 2:
While I run this (and the stream is still sending) I get continuous warning 
messages like this:
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists 
on this machine; not re-adding it
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists 
on this machine; not re-adding it

What does that mean?

Question 3:
I tried replacing the wordCounts.print() line with 
wordCounts.saveAsTextFiles(file:/my/path/outdir).
This results in the creation of a new outdir-timestamp file being created every 
two seconds...even if there's no data during that time period.  Is there a way 
to tell it to save only if there's data?

Thanks!


RE: [bug?] streaming window unexpected behaviour

2014-03-25 Thread Adrian Mocanu
Let me rephrase that,
Do you think it is possible to use an accumulator to skip the first few 
incomplete RDDs?

-Original Message-
From: Adrian Mocanu [mailto:amoc...@verticalscope.com] 
Sent: March-25-14 9:57 AM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: [bug?] streaming window unexpected behaviour

Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate 
partial windows? Or, Is it possible to remove the first few partial windows? 
I'm thinking of using an accumulator to count how many windows there are.

-A

-Original Message-
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: March-24-14 6:55 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will 
be partial windows (assuming window duration  sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com 
wrote:
 I have what I would call unexpected behaviour when using window on a stream.

 I have 2 windowed streams with a 5s batch interval. One window stream 
 is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow

 What I've noticed is that the 1st RDD produced by bigWindow is 
 incorrect and is of the size 5s not 10s. So instead of waiting 10s and 
 producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.

 Why is this happening? To me it looks like a bug; Matei or TD can you 
 verify that this is correct behaviour?





 I have the following code

 val ssc = new StreamingContext(conf, Seconds(5))



 val smallWindowStream = ssc.queueStream(smallWindowRddQueue)

 val bigWindowStream = ssc.queueStream(bigWindowRddQueue)



 val smallWindow = smallWindowReshapedStream.window(Seconds(5),
 Seconds(5))

   .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))

 val bigWindow = bigWindowReshapedStream.window(Seconds(10),
 Seconds(5))

 .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))



 -Adrian




remove duplicates

2014-03-24 Thread Adrian Mocanu
I have a DStream like this:
..RDD[a,b],RDD[b,c]..

Is there a way to remove duplicates across the entire DStream? Ie: I would like 
the output to be (by removing one of the b's):
..RDD[a],RDD[b,c]..  or ..RDD[a,b],RDD[c]..

Thanks
-Adrian



[bug?] streaming window unexpected behaviour

2014-03-24 Thread Adrian Mocanu
I have what I would call unexpected behaviour when using window on a stream.
I have 2 windowed streams with a 5s batch interval. One window stream is 
(5s,5s)=smallWindow and the other (10s,5s)=bigWindow
What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is 
of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 
10s, Spark produced the 1st 10s RDD of size 5s.
Why is this happening? To me it looks like a bug; Matei or TD can you verify 
that this is correct behaviour?


I have the following code
val ssc = new StreamingContext(conf, Seconds(5))

val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
val bigWindowStream = ssc.queueStream(bigWindowRddQueue)

val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))
  .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))
val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))
.reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))

-Adrian



is collect exactly-once?

2014-03-17 Thread Adrian Mocanu
Hi
Quick question here,
I know that .foreach is not idempotent.  I am wondering if collect() is 
idempotent? Meaning that once I've collect()-ed if spark node crashes I can't 
get the same values from the stream ever again.

Thanks
-Adrian



slf4j and log4j loop

2014-03-14 Thread Adrian Mocanu
Hi
Have you encountered a slf4j and log4j loop when using Spark? I pull a few 
packages via sbt.
Spark package uses slf4j-log4j12.jar and another package uses use 
log4j-over-slf4j.jar which creates the circular loop between the 2 loggers and 
thus the exception below. Do you know of a fix for this?


SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class 
path, preempting StackOverflowError.
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more 
details.
An exception or error caused a run to abort.
java.lang.ExceptionInInitializerError
at org.apache.log4j.Logger.getLogger(Logger.java:40)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
at org.apache.spark.SparkContext.init(SparkContext.scala:139)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)
at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar 
AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See 
also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at 
org.apache.log4j.Log4jLoggerFactory.clinit(Log4jLoggerFactory.java:51)
... 54 more


Thanks
-Adrian