RE: ElasticSearch for Spark times out
Hi The gist of it is this: I have data indexed into ES. Each index stores monthly data and the query will get data for some date range (across several ES indexes or within 1 if the date range is within 1 month). Then I merge these RDDs into an uberRdd and performs some operations then print the result with for each. The simplified code is below. { // esRdds: List[RDD] contains mentions count per post val esRdds = (startDate.getYear to endDate.getYear).flatMap { year = val sMonth = if (year == startDate.getYear) startDate.getMonthOfYear else 1 val eMonth = if (year == endDate.getYear) endDate.getMonthOfYear else 12 (sMonth to eMonth).map { i = sc.esRDD(s$year-${i.formatted(%02d)}_nlpindex/nlp, ESQueries.generateQueryString(Some(startDate), Some(endDate), mentionsToFind, siteNames)) .map { case (str, map) = unwrapAndCountMentionsPerPost(map)} } } var uberRdd = esRdds(0) for (rdd - esRdds) { uberRdd = uberRdd ++ rdd } uberRdd.map joinforeach(x = println(x)) } From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: April 22, 2015 2:52 PM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: ElasticSearch for Spark times out will you be able to paste the code? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read
ElasticSearch for Spark times out
Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na] at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) ~[commons-httpclient-3.1.jar:na] at java.io.FilterInputStream.read(FilterInputStream.java:133) ~[na:1.7.0_75] at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) ~[commons-httpclient-3.1.jar:na] at org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) ~[jackson-core-asl-1.9.11.jar:1.9.11] at org.codehaus.jackson.impl.Utf8StreamParser.parseEscapedFieldName(Utf8StreamParser.java:1502) ~[jackson-core-asl-1.9.11.jar:1.9.11] at org.codehaus.jackson.impl.Utf8StreamParser.slowParseFieldName(Utf8StreamParser.java:1404) ~[jackson-core-asl-1.9.11.jar:1.9.11] at
RE: ElasticSearch for Spark times out
Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don't think my ES is that slow but it's possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time. Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: April 22, 2015 3:09 PM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn't it? Bug?) Here's the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library.jar:na] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:152) ~[na:1.7.0_75] at java.net.SocketInputStream.read(SocketInputStream.java:122) ~[na:1.7.0_75] at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75] at java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75] at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) ~[commons-httpclient-3.1.jar:na
EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out
Hi I need help fixing a time out exception thrown from ElasticSearch Hadoop. The ES cluster is up all the time. I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection of these RDD which I traverse (with foreachRDD) and create more RDDs from each one RDD in the collection. The resulting RDDs I put in a Queue from which I create a DStream. After about 10 minutes of running, the program's debug output hangs for a bit then throws: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out This is the output: [data from elastic search like the next line (in green)] 13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}} 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close connection policy 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpConnection - Releasing connection back to connection manager. 13:55:26.631 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library.jar:na] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library.jar:na] at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.scheduler.Task.run(Task.scala:54) ~[spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) ~[spark-core_2.10-1.1.0.jar:1.1.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75]
Spark log shows only this line repeated: RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time X
Here's my log output from a streaming job. What is this? 09:54:27.504 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067504 09:54:27.505 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067505 09:54:27.506 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067506 09:54:27.508 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067507 09:54:27.508 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067508 09:54:27.509 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067509 09:54:27.510 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067510 09:54:27.511 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067511 09:54:27.512 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067512 09:54:27.513 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067513 09:54:27.514 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067514 09:54:27.515 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067515 09:54:27.516 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067516 09:54:27.517 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067517 09:54:27.518 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067518 09:54:27.519 [RecurringTimer - JobGenerator] DEBUG o.a.s.streaming.util.RecurringTimer - Callback for JobGenerator called at time 1427378067519 09:54:27.520 [Recurri ...
RE: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out
I also get stack overflow every now and then without having any recursive calls: java.lang.StackOverflowError: null at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1479) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_75] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_75] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_75] at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) ~[scala-library.jar:na] at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75] this is a huge stack trace... but it keeps repeating What could this be from? From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March 26, 2015 2:10 PM To: u...@spark.incubator.apache.org; user@spark.apache.org Subject: EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out Hi I need help fixing a time out exception thrown from ElasticSearch Hadoop. The ES cluster is up all the time. I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection of these RDD which I traverse (with foreachRDD) and create more RDDs from each one RDD in the collection. The resulting RDDs I put in a Queue from which I create a DStream. After about 10 minutes of running, the program's debug output hangs for a bit then throws: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out This is the output: [data from elastic search like the next line (in green)] 13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista,toyota,toyota classic,toyota sprinter,toyota crown,toyota tundra,toyota prius,toyota aa,toyota stout,toyota camry,toyota vista],timestamp:[1373976139000],links.entity.rank:[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}} 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close connection policy 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1 13:55:26.620 [Executor task launch worker-0] DEBUG o.a.c.httpclient.HttpConnection - Releasing connection back to connection manager. 13:55:26.631 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10) org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll
writing DStream RDDs to the same file
Hi Is there a way to write all RDDs in a DStream to the same file? I tried this and got an empty file. I think it's bc the file is not closed i.e. ESMinibatchFunctions.writer.close() executes before the stream is created. Here's my code myStream.foreachRDD(rdd = { rdd.foreach(x = { ESMinibatchFunctions.writer.append(rdd.collect()(0).toString()+ the data )}) //localRdd = localRdd.union(rdd) //localArray = localArray ++ rdd.collect() } ) ESMinibatchFunctions.writer.close() object ESMinibatchFunctions { val writer = new PrintWriter(c:/delme/exxx.txt) }
updateStateByKey - Seq[V] order
Hi Does updateStateByKey pass elements to updateFunc (in Seq[V]) in order in which they appear in the RDD? My guess is no which means updateFunc needs to be commutative. Am I correct? I've asked this question before but there were no takers. Here's the scala docs for updateStateByKey /** * Return a new state DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) }
partitionBy not working w HashPartitioner
Here's my use case: I read an array into an RDD and I use a hash partitioner to partition the RDD. This is the array type: Array[(String, Iterable[(Long, Int)])] topK:Array[(String, Iterable[(Long, Int)])] = ... import org.apache.spark.HashPartitioner val hashPartitioner=new HashPartitioner(10) val resultRdd=sc.parallelize(topK).partitionBy(hashPartitioner).sortByKey().saveAsTextFile(fileName) I also tried val resultRdd=sc.parallelize(topK, 10).sortByKey().saveAsTextFile(fileName) The results: I do get 10 partitions. However, the first partition always contains data for the first 2 keys in the RDD, then each following partition contains data for 1 key in the RDD (as expected), then the last file is empty since the first file contained 2 keys. The Question: How to make Spark write 1 file per key? Is this behaviour I'm currently seeing a bug? -Adrian
how to print RDD by key into file with grouByKey
Hi I have an RDD: RDD[(String, scala.Iterable[(Long, Int)])] which I want to print into a file, a file for each key string. I tried to trigger a repartition of the RDD by doing group by on it. The grouping gives RDD[(String, scala.Iterable[Iterable[(Long, Int)]])] so I flattened that: Rdd.groupByKey().mapValues(x=x.flatten) However, when I print with saveAsTextFile I get only 2 files I was under the impression that groupBy repartitions the data by key and saveAsTextFile make a file per partition. What am I doing wrong here? Thanks Adrian
does updateStateByKey return Seq() ordered?
I was looking at updateStateByKey documentation, It passes in a values Seq which contains values that have the same key. I would like to know if there is any ordering to these values. My feeling is that there is no ordering, but maybe it does preserve RDD ordering. Example: RDD[ (a,2), (a,3), (a,1)] Can values be unordered like Seq ((a,3),(a,1),(a,2) ) ? -Adrian
Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions
Hi I get this exception when I run a Spark test case on my local machine: An exception or error caused a run to abort: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/dstream/PairDStreamFunctions; java.lang.NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/dstream/PairDStreamFunctions; In my test case I have these Spark related imports imports: import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.TestSuiteBase import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -Adrian
RE: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions
I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My test case uses org.apache.spark.streaming.TestSuiteBase val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided excludeAll( val sparkStreaming= org.apache.spark % spark-streaming_2.10 % 1.1.0-SNAPSHOT % provided excludeAll( val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA exclude(org.apache.cassandra, cassandra-all) exclude(org.apache.cassandra, cassandra-thrift) val casAll = org.apache.cassandra % cassandra-all % 2.0.3 intransitive() val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3 intransitive() val sparkStreamingFromKafka = org.apache.spark % spark-streaming-kafka_2.10 % 0.9.1 excludeAll( -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: January-22-15 11:39 AM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions NoSuchMethodError almost always means that you have compiled some code against one version of a library but are running against another. I wonder if you are including different versions of Spark in your project, or running against a cluster on an older version? On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I get this exception when I run a Spark test case on my local machine: An exception or error caused a run to abort: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; java.lang.NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; In my test case I have these Spark related imports imports: import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.TestSuiteBase import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -Adrian B CB [ X ܚX KK[XZ[ \ \ ][ X ܚX P \ ˘\X K ܙ B ܈Y][ۘ[ [X[ K[XZ[ \ \ Z[ \ ˘\X K ܙ B B - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions
I use spark 1.1.0-SNAPSHOT val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided excludeAll( -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: January-22-15 11:39 AM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions NoSuchMethodError almost always means that you have compiled some code against one version of a library but are running against another. I wonder if you are including different versions of Spark in your project, or running against a cluster on an older version? On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I get this exception when I run a Spark test case on my local machine: An exception or error caused a run to abort: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; java.lang.NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; In my test case I have these Spark related imports imports: import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.TestSuiteBase import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -Adrian
reconciling best effort real time with delayed aggregation
Hi I have a question regarding design trade offs and best practices. I'm working on a real time analytics system. For simplicity, I have data with timestamps (the key) and counts (the value). I use DStreams for the real time aspect. Tuples w the same timestamp can be across various RDDs and I just aggregate each RDD by timestamp and increment counters in Cassandra; this gives correct aggregation counts based on data timestamp. At the same time as tuple aggregations as saved into Cassandra I also show the aggregations on a chart and also pass the data through some more complicated math formulas (they output DStreams) which involve using updateStateByKey. These other operations are similar to moving average in the way that if data comes late you'd have to recalculate all moving averages starting from the date of the last delayed tuple; such is the nature of these calculations. The calculations are not saved in db but recalculated every time data is loaded from db. Now, in real time I do things on a best effort basis, but the database will always have correct aggregations (even if tuples come in late for some early timestamp Cassandra will easily increment a counter w amount from this late tuple). In real time, when a tuple w same timestamp belongs to several RDDs, I don't aggregate by tuple timestamp (bc that would mean reapplying the math formulas from the timestamp of the last tuple and that is too much overhead) instead I aggregate by RDD time which is system time when the RDD is created. This is good enough for real time. So now you can see that the db (the truth provider) is different from real time streaming results (best effort). My questions: 1. From your experience, is this design I just described appropriate? 2. I'm curious how others have solved the problem of reconciling diferences in their real time processing w batch mode. I think I read on the mailig list (several months ago) that someone re does the aggregation step an hour after data is received (ie aggregation DStream job is always an hour behind so that way late tuples have time to propagate to the db) 3. In case the source of data fails and it is restarted later, my design will give duplicates unless the tuples from the database are deleted for the timestamps that the data I am re-streaming contains. Is there a better way to avoid duplicates if running the same job twice or part of a bigger job. (idempotency) Thanks -Adrian
RE: RowMatrix.multiply() ?
I’m resurrecting this thread because I’m interested in doing transpose on a RowMatrix. There is this other thread too: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-multiplication-in-spark-td12562.html Which presents https://issues.apache.org/jira/browse/SPARK-3434 which is still in work at this time. Is this the correct Jira issue for the transpose operation? ETA? Thanks a lot! -A From: Reza Zadeh [mailto:r...@databricks.com] Sent: October-15-14 1:48 PM To: ll Cc: u...@spark.incubator.apache.org Subject: Re: RowMatrix.multiply() ? Hi, We are currently working on distributed matrix operations. Two RowMatrices cannot be currently multiplied together. Neither can be they be added. They functionality will be added soon. You can of course achieve this yourself by using IndexedRowMatrix and doing one join per operation you requested. Best, Reza On Wed, Oct 15, 2014 at 8:50 AM, ll duy.huynh@gmail.commailto:duy.huynh@gmail.com wrote: hi.. it looks like RowMatrix.multiply() takes a local Matrix as a parameter and returns the result as a distributed RowMatrix. how do you perform this series of multiplications if A, B, C, and D are all RowMatrix? ((A x B) x C) x D) thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RowMatrix-multiply-tp16509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
using RDD result in another TDD
Hi I'd like to use the result of one RDD1 in another RDD2. Normally I would use something like a barrier so make the 2nd RDD wait till the computation of the 1st RDD is done then include the result from RDD1 in the closure for RDD2. Currently I create another RDD, RDD3, out of the result of RDD1 then do Cartesian product on RDD2 and RDD3. NB: This operation is slow and expands partitions from 270 to 1200 This is a simplified example but I think it should help: What I want to do (pseudocode): val a:Int=RDD1.reduce(..) RDD2.map(x = x*a) What I use right now (pseudocode): val a:Int=RDD1.reduce(..) RDD3=makeRDD(a) RDD2.cartesianProduct(RDD3) How to structure this type of operation to not need the barrier to block computing RDD2 until RDD1 is done? -Adrian
RE: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2
My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever. -Original Message- From: spr [mailto:s...@yarcdata.com] Sent: November-12-14 2:25 PM To: u...@spark.incubator.apache.org Subject: Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2 After comparing with previous code, I got it work by making the return a Some instead of Tuple2. Perhaps some day I will understand this. spr wrote --code val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) = { val currentCount = if (values.isEmpty) 0 else values.map( x = x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x = x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) // (currentCount + previousCount, Seq(minTime, newMinTime).min) == old Some(currentCount + previousCount, Seq(minTime, newMinTime).min) // == new } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2
You are correct; the filtering I’m talking about is done implicitly. You don’t have to do it yourself. Spark will do it for you and remove those entries from the state collection. From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: November-12-14 3:50 PM To: Adrian Mocanu Cc: spr; u...@spark.incubator.apache.org Subject: Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2 Adrian, do you know if this is documented somewhere? I was also under the impression that setting a key's value to None would cause the key to be discarded (without any explicit filtering on the user's part) but can not find any official documentation to that effect On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: My understanding is that the reason you have an Option is so you could filter out tuples when None is returned. This way your state data won't grow forever. -Original Message- From: spr [mailto:s...@yarcdata.commailto:s...@yarcdata.com] Sent: November-12-14 2:25 PM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2 After comparing with previous code, I got it work by making the return a Some instead of Tuple2. Perhaps some day I will understand this. spr wrote --code val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) = { val currentCount = if (values.isEmpty) 0 else values.map( x = x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x = x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) // (currentCount + previousCount, Seq(minTime, newMinTime).min) == old Some(currentCount + previousCount, Seq(minTime, newMinTime).min) // == new } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
[spark upgrade] Error communicating with MapOutputTracker when running test cases in latest spark
I use https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala to help me with testing. In spark 9.1 my tests depending on TestSuiteBase worked fine. As soon as I switched to latest (1.0.1) all tests fail. My sbt import is: org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided One exception I get is: Error communicating with MapOutputTracker org.apache.spark.SparkException: Error communicating with MapOutputTracker How can I fix this? Found a thread on this error but not very helpful: http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3ctencent_6b37d69c54f76819509a5...@qq.com%3E -Adrian
RE: ElasticSearch enrich
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215, could you post your code? I am interested in your solution. Thanks Adrian From: boci [mailto:boci.b...@gmail.com] Sent: June-26-14 6:17 PM To: user@spark.apache.org Subject: Re: ElasticSearch enrich Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use
RE: How to turn off MetadataCleaner?
Hi TD, I use 0.9.1. Thanks for letting me know. This issue drove me up the wall. I even made a method to close all that I could think of: def stopSpark(ssc: StreamingContext) = { ssc.sparkContext.cleanup(500) ssc.sparkContext.clearFiles() ssc.sparkContext.clearJars() ssc.sparkContext.metadataCleaner.cancel() ssc.awaitTermination(500) ssc.stop(true) } -A From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-22-14 9:50 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: How to turn off MetadataCleaner? The cleaner should remain up while the sparkcontext is still active (not stopped). However, here it seems you are stopping the sparkContext (ssc.stop(true)), the cleaner should be stopped. However, there was a bug earlier where some of the cleaners may not have been stopped when the context is stopped. What version are you using. If it is 0.9.1, I can see that the cleaner in ShuffleBlockManagerhttps://github.com/apache/spark/blob/v0.9.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala is not stopped, so it is a bug. TD On Thu, May 22, 2014 at 9:24 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Hi After using sparks TestSuiteBase to run some tests I’ve noticed that at the end, after finishing all tests the cleaner is still running and outputs the following perdiodically: INFO o.apache.spark.util.MetadataCleaner - Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER I use method testOperation and I’ve changed it so that it stores the pointer to ssc after running setupStreams. Then using that pointer to turn things off, but the cleaner remains up. How to shut down all of spark, including cleaner? Here is how I changed testOperation method (changes in bold): def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] = DStream[V], expectedOutput: Seq[Seq[V]], numBatches: Int, useSet: Boolean ) { val numBatches_ = if (numBatches 0) numBatches else expectedOutput.size val ssc = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, numBatches_, expectedOutput.size) verifyOutput[V](output, expectedOutput, useSet) ssc.awaitTermination(500) ssc.stop(true) } -Adrian
How to turn off MetadataCleaner?
Hi After using sparks TestSuiteBase to run some tests I've noticed that at the end, after finishing all tests the cleaner is still running and outputs the following perdiodically: INFO o.apache.spark.util.MetadataCleaner - Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER I use method testOperation and I've changed it so that it stores the pointer to ssc after running setupStreams. Then using that pointer to turn things off, but the cleaner remains up. How to shut down all of spark, including cleaner? Here is how I changed testOperation method (changes in bold): def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] = DStream[V], expectedOutput: Seq[Seq[V]], numBatches: Int, useSet: Boolean ) { val numBatches_ = if (numBatches 0) numBatches else expectedOutput.size val ssc = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, numBatches_, expectedOutput.size) verifyOutput[V](output, expectedOutput, useSet) ssc.awaitTermination(500) ssc.stop(true) } -Adrian
tests that run locally fail when run through bamboo
I have a few test cases for Spark which extend TestSuiteBase from org.apache.spark.streaming. The tests run fine on my machine but when I commit to repo and run the tests automatically with bamboo the test cases fail with these errors. How to fix? 21-May-2014 16:33:09 [info] StreamingZigZagSpec: 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.StreamCorruptedException: invalid type code: AC) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ... 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with intermittent empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] Operation timed out after 10042 ms (TestSuiteBase.scala:283) 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.FileNotFoundException: /tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or directory)) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ... 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream w notification for each change *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 141.0:0 failed 1 times (most recent failure: Exception failure: java.io.FileNotFoundException: http://10.10.1.9:62793/broadcast_1) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at
RE: tests that run locally fail when run through bamboo
Just found this at the top of the log: 17:14:41.124 [pool-7-thread-3-ScalaTest-running-StreamingSpikeSpec] WARN o.e.j.u.component.AbstractLifeCycle - FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use build 21-May-2014 17:14:41 java.net.BindException: Address already in use Is there a way to set these connection up so that they don't all start on the same port (that's my guess for the root cause of the issue) From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: May-21-14 4:58 PM To: u...@spark.incubator.apache.org; user@spark.apache.org Subject: tests that run locally fail when run through bamboo I have a few test cases for Spark which extend TestSuiteBase from org.apache.spark.streaming. The tests run fine on my machine but when I commit to repo and run the tests automatically with bamboo the test cases fail with these errors. How to fix? 21-May-2014 16:33:09 [info] StreamingZigZagSpec: 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.StreamCorruptedException: invalid type code: AC) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ... 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with intermittent empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] Operation timed out after 10042 ms (TestSuiteBase.scala:283) 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.FileNotFoundException: /tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or directory)) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ... 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream w notification for each change *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 141.0:0 failed 1 times (most recent failure: Exception failure: java.io.FileNotFoundException: http://10.10.1.9:62793/broadcast_1) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply
RE: slf4j and log4j loop
Please ignore. This was sent last week not sure why it arrived so late. -Original Message- From: amoc [mailto:amoc...@verticalscope.com] Sent: May-09-14 10:13 AM To: u...@spark.incubator.apache.org Subject: Re: slf4j and log4j loop Hi Patrick/Sean, Sorry to resurrect this thread, but after upgrading to Spark 9.1 I still get this error on runtime. ..trying to run some tests here. Has this actually been integrated int Spark 9.1? Thanks again -A -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/slf4j-and-log4j-loop-tp2699p5524.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
same log4j slf4j error in spark 9.1
I recall someone from the Spark team (TD?) saying that Spark 9.1 will change the logger and the circular loop error between slf4j and log4j wouldn't show up. Yet on Spark 9.1 I still get SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. Any solutions? -Adrian
missing method in my slf4j after excluding Spark ZK log4j
Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
RE: another updateStateByKey question - updated w possible Spark bug
I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ] When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2)) Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( )) In the update function I also save Seq[V] to state so I can see it in the RDD. I also
RE: another updateStateByKey question - updated w possible Spark bug
Forgot to mention my batch interval is 1 second: val ssc = new StreamingContext(conf, Seconds(1)) hence the Thread.sleep(1100) From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: May-05-14 12:06 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: RE: another updateStateByKey question - updated w possible Spark bug I’ve encountered this issue again and am able to reproduce it about 10% of the time. 1. Here is the input: RDD[ (a, 126232566, 1), (a, 126232566, 2) ] RDD[ (a, 126232566, 1), (a, 126232566, 3) ] RDD[ (a, 126232566, 3) ] RDD[ (a, 126232566, 4) ] RDD[ (a, 126232566, 2) ] RDD[ (a, 126232566, 5), (a, 126232566, 5) ] 2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line): (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(10,5,ArrayBuffer())),26) -empty elements Seq[V] (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) (((a,126232566),StateClass(26,9,ArrayBuffer())),53) -empty elements Seq[V] (((a,126232566),StateClass(26,9,ArrayBuffer())),59) -empty elements Seq[V] 3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] ) (((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6) (((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13) (((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20) (((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33) (((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40) (((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47) 4. Here is the code: case class StateClass(sum:Integer, count:Integer, elements:Seq[Double]) val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) = { // if (values.isEmpty) { //// if RDD cannot find values for this key (which is from prev RDD, //// the tuple will not be shown in this RDD w values of 0 //None // } else { val previousState = state.getOrElse(StateClass(0, 0, Seq())) val currentCount = values.size + previousState.count var currentSum=0 for (newValue - values) yield ({ currentSum = currentSum + newValue._3 }) currentSum= currentSum +previousState.sum val elements = for (newValues - values) yield ({ newValues._3.toDouble }) Some(StateClass(currentSum, currentCount, elements)) // } } val partialResultSums= inputStream.map((x:(String, Long, Int)) =((x._1), (x._1, x._2, x._3))) //re map .updateStateByKey[StateClass](updateSumFunc) //update state .transform(rdd=rdd.map(t=(t,rdd.id))) //add RDD ID to RDD tuples partialResultSums.print() Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces: rddQueue += ssc.sparkContext.makeRDD(smallWindow1) // smallWindow1 = List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) ) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow2) // smallWindow2= List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow3) // smallWindow3= List[(String, Long, Int)]((a, 126232566, 3)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow4) // smallWindow4= List[(String, Long, Int)]((a, 126232566, 4)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow5) // smallWindow5= List[(String, Long, Int)]((a, 126232566, 2)) Thread.sleep(1100) rddQueue += ssc.sparkContext.makeRDD(smallWindow6) // smallWindow6= List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5)) Thread.sleep(3100) //ssc.awaitTermination() ssc.stop() In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up. I wonder how to bypass this bug/feature of Spark. Thanks -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: May-02-14 3:10 PM To: user@spark.apache.orgmailto:user@spark.apache.org Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: another updateStateByKey question Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: Has anyone else noticed that sometimes the same tuple calls update state function twice? I have 2 tuples with the same key
RE: What is Seq[V] in updateStateByKey?
So Seq[V] contains only new tuples. I initially thought that whenever a new tuple was found, it would add it to Seq and call the update function immediately so there wouldn't be more than 1 update to Seq per function call. Say I want to sum tuples with the same key is an RDD using updateStateByKey, Then (1) Seq[V] would contain the numbers for a particular key and my S state could be the sum? Or would (2) Seq contain partial sums (say sum per partition?) which I then need to sum into the final sum? After writing this out and thinking a little more about it I think #2 is correct. Can you confirm? Thanks again! -A -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: April-30-14 4:30 PM To: user@spark.apache.org Subject: Re: What is Seq[V] in updateStateByKey? S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total. It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not 1 + 1. I butted in since I'd like to ask a different question about the same line of code. Why: val currentCount = values.foldLeft(0)(_ + _) instead of val currentCount = values.sum This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like filter(_ == 200).size instead of count(_ == 200)... pretty trivial but hey. On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count? From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct?
updateStateByKey example not using correct input data?
I'm trying to understand updateStateByKey. Here's an example I'm testing with: Input data: DStream( RDD( (a,2) ), RDD( (a,3) ), RDD( (a,4) ), RDD( (a,5) ), RDD( (a,6) ), RDD( (a,7) ) ) Code: val updateFunc = (values: Seq[Int], state: Option[StateClass]) = { val previousState = state.getOrElse( StateClass(0,0, Seq()) ) val currentSum = values.sum + previousState.sum val currentCount = values.size + previousState.count if (currentCount==previousState.count) { None//if this RDD has no change then remove the tuple } else { Some( StateClass(currentSum, currentCount, values) ) } } intStream.updateStateByKey[StateClass](updateFunc).transform(rdd=rdd.map(t=(t,rdd.id))).print() Results: ((a,StateClass(14,5,ArrayBuffer(2.0, 3.0, 3.0, 3.0, 3.0))),12) ((a,StateClass(17,6,ArrayBuffer(3.0))),22) ((a,StateClass(20,7,ArrayBuffer(3.0))),32) Questions: Why does RDD with ID=12 have these elements: (2.0, 3.0, 3.0, 3.0, 3.0) ? These do not exist in input data so where do these numbers come from? ..well 2 and 3 exists but not the other 3's and it's missing 4,5,6,7 also. What is going on here? -Adrian
range partitioner with updateStateByKey
If I use a range partitioner, will this make updateStateByKey take the tuples in order? Right now I see them not being taken in order (most of them are ordered but not all) -Adrian
What is Seq[V] in updateStateByKey?
What is Seq[V] in updateStateByKey? Does this store the collected tuples of the RDD in a collection? Method signature: def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn't compile. -Adrian
FW: reduceByKeyAndWindow - spark internals
Any suggestions where I can find this in the documentation or elsewhere? Thanks From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: April-24-14 11:26 AM To: u...@spark.incubator.apache.org Subject: reduceByKeyAndWindow - spark internals If I have this code: val stream1= doublesInputStream.window(Seconds(10), Seconds(2)) val stream2= stream1.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(10)) Does reduceByKeyAndWindow merge all RDDs from stream1 that came in the 10 second window? Example, in the first 10 secs stream1 will have 5 RDDS. Does reduceByKeyAndWindow merge these 5RDDs into 1 RDD and remove duplicates? -Adrian
reduceByKeyAndWindow - spark internals
If I have this code: val stream1= doublesInputStream.window(Seconds(10), Seconds(2)) val stream2= stream1.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(10)) Does reduceByKeyAndWindow merge all RDDs from stream1 that came in the 10 second window? Example, in the first 10 secs stream1 will have 5 RDDS. Does reduceByKeyAndWindow merge these 5RDDs into 1 RDD and remove duplicates? -Adrian
writing booleans w Calliope
Has anyone managed to write Booleans to Cassandra from an RDD with Calliope? My Booleans give compile time errors: expression of type List[Any] does not conform to expected type Types.CQLRowValues CQLColumnValue is definted as ByteBuffer: type CQLColumnValue = ByteBuffer For now I convert them to string. I tried converting them to bytes but that compiled but gave me a runtime error since scala byte is not compatible with Cassandra Boolean. -Adrian
RE: function state lost when next RDD is processed
I'd like to resurrect this thread since I don't have an answer yet. From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March-27-14 10:04 AM To: u...@spark.incubator.apache.org Subject: function state lost when next RDD is processed Is there a way to pass a custom function to spark to run it on the entire stream? For example, say I have a function which sums up values in each RDD and then across RDDs. I've tried with map, transform, reduce. They all apply my sum function on 1 RDD. When the next RDD comes the function starts from 0 so the sum of the previous RDD is lost. Does Spark support a way of passing a custom function so that its state is preserved across RDDs and not only within RDD? Thanks -Adrian
RE: function state lost when next RDD is processed
Thanks! Ya that's what I'm doing so far, but I wanted to see if it's possible to keep the tuples inside Spark for fault tolerance purposes. -A From: Mark Hamstra [mailto:m...@clearstorydata.com] Sent: March-28-14 10:45 AM To: user@spark.apache.org Subject: Re: function state lost when next RDD is processed As long as the amount of state being passed is relatively small, it's probably easiest to send it back to the driver and to introduce it into RDD transformations as the zero value of a fold. On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: I'd like to resurrect this thread since I don't have an answer yet. From: Adrian Mocanu [mailto:amoc...@verticalscope.commailto:amoc...@verticalscope.com] Sent: March-27-14 10:04 AM To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: function state lost when next RDD is processed Is there a way to pass a custom function to spark to run it on the entire stream? For example, say I have a function which sums up values in each RDD and then across RDDs. I've tried with map, transform, reduce. They all apply my sum function on 1 RDD. When the next RDD comes the function starts from 0 so the sum of the previous RDD is lost. Does Spark support a way of passing a custom function so that its state is preserved across RDDs and not only within RDD? Thanks -Adrian
RE: Splitting RDD and Grouping together to perform computation
I think you should sort each RDD -Original Message- From: yh18190 [mailto:yh18...@gmail.com] Sent: March-28-14 4:44 PM To: u...@spark.incubator.apache.org Subject: Re: Splitting RDD and Grouping together to perform computation Hi, Thanks Nanzhu.I tried to implement your suggestion on following scenario.I have RDD of say 24 elements.In that when i partioned into two groups of 12 elements each.Their is loss of order of elements in partition.Elemest are partitioned randomly.I need to preserve the order such that the first 12 elements should be 1st partition and 2nd 12 elemts should be in 2nd partition. Guys please help me how to main order of original sequence even after partioningAny solution Before Partition:RDD 64 29186 16059 9143 6439 6155 9187 18416 25565 30420 33952 38302 43712 47092 48803 52687 56286 57471 63429 70715 75995 81878 80974 71288 48556 After Partition:In group1 with 12 elements 64, 29186, 18416 30420 33952 38302 43712 47092 56286 81878 80974 71288 48556 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3447.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Splitting RDD and Grouping together to perform computation
I say you need to remap so you have a key for each tuple that you can sort on. Then call rdd.sortByKey(true) like this mystream.transform(rdd = rdd.sortByKey(true)) For this fn to be available you need to import org.apache.spark.rdd.OrderedRDDFunctions -Original Message- From: yh18190 [mailto:yh18...@gmail.com] Sent: March-28-14 5:02 PM To: u...@spark.incubator.apache.org Subject: RE: Splitting RDD and Grouping together to perform computation Hi, Here is my code for given scenario.Could you please let me know where to sort?I mean on what basis we have to sort??so that they maintain order in partition as thatof original sequence.. val res2=reduced_hccg.map(_._2)// which gives RDD of numbers res2.foreach(println) val result= res2.mapPartitions(p={ val l=p.toList val approx=new ListBuffer[(Int)] val detail=new ListBuffer[Double] for(i-0 until l.length-1 by 2) { println(l(i),l(i+1)) approx+=(l(i),l(i+1)) } approx.toList.iterator detail.toList.iterator }) result.foreach(println) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3450.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Splitting RDD and Grouping together to perform computation
Not sure how to change your code because you'd need to generate the keys where you get the data. Sorry about that. I can tell you where to put the code to remap and sort though. import org.apache.spark.rdd.OrderedRDDFunctions val res2=reduced_hccg.map(_._2) .map( x= (newkey,x)).sortByKey(true) //and if you want remap them to remove the key that you used for sorting: .map(x= x._2) res2.foreach(println) val result= res2.mapPartitions(p={ val l=p.toList val approx=new ListBuffer[(Int)] val detail=new ListBuffer[Double] for(i-0 until l.length-1 by 2) { println(l(i),l(i+1)) approx+=(l(i),l(i+1)) } approx.toList.iterator detail.toList.iterator }) result.foreach(println) -Original Message- From: yh18190 [mailto:yh18...@gmail.com] Sent: March-28-14 5:17 PM To: u...@spark.incubator.apache.org Subject: RE: Splitting RDD and Grouping together to perform computation Hi Andriana, Thanks for suggestion.Could you please modify my code part where I need to do so..I apologise for inconvinience ,becoz i am new to spark I coudnt apply appropriately..i would be thankful to you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3452.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
StreamingContext.transform on a DStream
Found this transform fn in StreamingContext which takes in a DStream[_] and a function which acts on each of its RDDs Unfortunately I can't figure out how to transform my DStream[(String,Int)] into DStream[_] /*** Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. */ def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) = RDD[T] ): DStream[T] = { new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } -Adrian
RE: StreamingContext.transform on a DStream
Please disregard I didn't see the Seq wrapper. From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March-27-14 11:57 AM To: u...@spark.incubator.apache.org Subject: StreamingContext.transform on a DStream Found this transform fn in StreamingContext which takes in a DStream[_] and a function which acts on each of its RDDs Unfortunately I can't figure out how to transform my DStream[(String,Int)] into DStream[_] /*** Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. */ def transform[T: ClassTag]( dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) = RDD[T] ): DStream[T] = { new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } -Adrian
how to create a DStream from bunch of RDDs
I create several RDDs by merging several consecutive RDDs from a DStream. Is there a way to add these new RDDs to a DStream? -Adrian
closures moving averages (state)
I'm passing a moving average function during the map phase like this: val average= new Sma(window=3) stream.map(x= average.addNumber(x)) where class Sma extends Serializable { .. } I also tried to put creation of object average in an object like I saw in another post: object Average { val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3) } Every time average.addNumber is called it is a new instance. How can I preserve state of average object? Thanks -Adrian
RE: closures moving averages (state)
Tried with reduce and it's giving me pretty weird results that make no sense ie: 1 number for an entire RDD val smaStream= inputStream.reduce{ case(t1,t2) = { val sma= average.addDataPoint(t1) sma }} Tried with transform and that worked correctly, but unfortunately, it works 1 RDD at a time so the moving average is reset when the next consecutive RDD is read .. as if a new instance of the Average class is created for each RDD. Is there a way to have 1 global variable of custom type (ie my case Average type) .. somewhat like accumulators, but not incrementable in parallel - it wouldn't make sense for a moving average. The reason I want to apply a moving average function to a stream is so that the tuples remain in Spark and benefit from its fault tolerant mechanisms. My guess is that currently this is not possible, but I'll wait for one of the Spark creators to comment on this. -A From: Benjamin Black [mailto:b...@b3k.us] Sent: March-26-14 11:50 AM To: user@spark.apache.org Subject: Re: closures moving averages (state) Perhaps you want reduce rather than map? On Wednesday, March 26, 2014, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: I'm passing a moving average function during the map phase like this: val average= new Sma(window=3) stream.map(x= average.addNumber(x)) where class Sma extends Serializable { .. } I also tried to put creation of object average in an object like I saw in another post: object Average { val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3) } Every time average.addNumber is called it is a new instance. How can I preserve state of average object? Thanks -Adrian
RE: streaming questions
Hi Diana, I'll answer Q3: You can check if an RDD is empty in several ways. Someone here mentioned that using an iterator was safer: val isEmpty = rdd.mapPartitions(iter = Iterator(! iter.hasNext)).reduce(__) You can also check with a fold or rdd.count rdd.reduce(_ + _) // can't handle empty RDD rdd.fold(0)(_ + _) // no problem with empty RDD A From: Diana Carroll [mailto:dcarr...@cloudera.com] Sent: March-26-14 2:09 PM To: user Subject: streaming questions I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) Question 1: When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? Question 2: While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? Question 3: I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
RE: [bug?] streaming window unexpected behaviour
Let me rephrase that, Do you think it is possible to use an accumulator to skip the first few incomplete RDDs? -Original Message- From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March-25-14 9:57 AM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: RE: [bug?] streaming window unexpected behaviour Thanks TD! Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are. -A -Original Message- From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: March-24-14 6:55 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: [bug?] streaming window unexpected behaviour Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration sliding interval). TD On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
remove duplicates
I have a DStream like this: ..RDD[a,b],RDD[b,c].. Is there a way to remove duplicates across the entire DStream? Ie: I would like the output to be (by removing one of the b's): ..RDD[a],RDD[b,c].. or ..RDD[a,b],RDD[c].. Thanks -Adrian
[bug?] streaming window unexpected behaviour
I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
is collect exactly-once?
Hi Quick question here, I know that .foreach is not idempotent. I am wondering if collect() is idempotent? Meaning that once I've collect()-ed if spark node crashes I can't get the same values from the stream ever again. Thanks -Adrian
slf4j and log4j loop
Hi Have you encountered a slf4j and log4j loop when using Spark? I pull a few packages via sbt. Spark package uses slf4j-log4j12.jar and another package uses use log4j-over-slf4j.jar which creates the circular loop between the 2 loggers and thus the exception below. Do you know of a fix for this? SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. An exception or error caused a run to abort. java.lang.ExceptionInInitializerError at org.apache.log4j.Logger.getLogger(Logger.java:40) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. at org.apache.log4j.Log4jLoggerFactory.clinit(Log4jLoggerFactory.java:51) ... 54 more Thanks -Adrian