Re: Spark caching questions

2014-09-10 Thread Mayur Rustagi
Cached RDD do not survive SparkContext deletion (they are scoped on a per
sparkcontext basis).
I am not sure what you mean by disk based cache eviction, if you cache more
RDD than disk space the result will not be very pretty :)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Sep 10, 2014 at 4:43 AM, Vladimir Rodionov 
vrodio...@splicemachine.com wrote:

 Hi, users

 1. Disk based cache eviction policy? The same LRU?

 2. What is the scope of a cached RDD? Does it survive application? What
 happen if I run Java app next time? Will RRD be created or read from cache?

 If , answer is YES, then ...


 3. Is there are any way to invalidate cached RDD automatically? RDD
 partitions? Some API kind of : RDD.isValid()?

 4. HadoopRDD InputFormat - based. Some partitions (splits) may become
 invalid in cache. Can we reload only those partitions? Into cache?

 -Vladimir



Re: Spark Streaming and database access (e.g. MySQL)

2014-09-10 Thread Mayur Rustagi
I think she is checking for blanks?
But if the RDD is blank then nothing will happen, no db connections etc.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote:

  if (rdd.take (1).size == 1) {
  rdd foreachPartition { iterator =


 I was wondering: Since take() is an output operation, isn't it computed
 twice (once for the take(1), once during the iteration)? Or will only one
 single element be computed for take(1)?

 Thanks
 Tobias





Re: RDD memory questions

2014-09-10 Thread Boxian Dong
Thank you very much for your kindly help. I rise some another questions:

   - If the RDD is stored in serialized format, is that means that whenever
the RDD is processed, it will be unpacked and packed again from and back to
the JVM even they are located on the same machine? 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png 

   - Can the RDD be partially unpacked from the serialized state? or when
every a RDD is touched, it must be fully unpacked, and of course pack again
afterword.

  -  When a RDD is cached, is it saved in a unserialized format or
serialized format? If it's saved in a unserialized format, is the partially
reading of RDD from JVM to PYTHON runtime possible? 

Thank you very much



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Luis Ángel Vicente Sánchez
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis



Re: groupBy gives non deterministic results

2014-09-10 Thread redocpot
Hi, 

I am using spark 1.0.0. The bug is fixed by 1.0.1.

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL -- more than two tables for join

2014-09-10 Thread boyingk...@163.com
Hi:
I Hava a question about Spark SQL。

First ,i use left join on two tables,like this:
sql(SELECT * FROM youhao_data left join youhao_age on 
(youhao_data.rowkey=youhao_age.rowkey)).collect().foreach(println)  
the result is my except。
But,when i use left join on three tables or more ,like this:
sql(SELECT * FROM youhao_data left join youhao_age on 
(youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on 
(youhao_age.rowkey=youhao_totalKiloMeter.rowkey)).collect().foreach(println) 
I take the Exception:
Exception in thread main java.lang.RuntimeException: [1.90] failure: 
``UNION'' expected but `left' found

SELECT * FROM youhao_data left join youhao_age on 
(youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on 
(youhao_age.rowkey=youhao_totalKiloMeter.rowkey)

 ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:181)
at 
org.apache.spark.examples.sql.SparkSQLHBaseRelation$.main(SparkSQLHBaseRelation.scala:140)
at 
org.apache.spark.examples.sql.SparkSQLHBaseRelation.main(SparkSQLHBaseRelation.scala)


My Question IS:
1、Whether My SQL script error or Spark SQL not support more than two tables for 
join?
2、If Saprk SQL not support more than two tables, How Can I do for my requirment?






boyingk...@163.com

Re: groupBy gives non deterministic results

2014-09-10 Thread Ye Xianjin
Great. And you should ask question in user@spark.apache.org mail list.  I 
believe many people don't subscribe the incubator mail list now.

-- 
Ye Xianjin
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Wednesday, September 10, 2014 at 6:03 PM, redocpot wrote:

 Hi, 
 
 I am using spark 1.0.0. The bug is fixed by 1.0.1.
 
 Hao
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13864.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




Dependency Problem with Spark / ScalaTest / SBT

2014-09-10 Thread Thorsten Bergler

Hello,

I am writing a Spark App which is already working so far.
Now I started to build also some UnitTests, but I am running into some 
dependecy problems and I cannot find a solution right now. Perhaps 
someone could help me.


I build my Spark Project with SBT and it seems to be configured well, 
because compiling, assembling and running the built jar with 
spark-submit are working well.


Now I started with the UnitTests, which I located under /src/test/scala.

When I call test in sbt, I get the following:

14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server
[trace] Stack trace suppressed: run last test:test for the full output.
[error] Could not run test test.scala.SetSuite: 
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse

[info] Run completed in 626 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[error] Error during tests:
[error] test.scala.SetSuite
[error] (test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 3 s, completed 10.09.2014 12:22:06

last test:test gives me the following:

 last test:test
[debug] Running TaskDef(test.scala.SetSuite, 
org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector])

java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)

at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at test.scala.SetSuite.init(SparkTest.scala:16)

I also noticed right now, that sbt run is also not working:

14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server
[error] (run-main-2) java.lang.NoClassDefFoundError: 
javax/servlet/http/HttpServletResponse

java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)

at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at 
main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29)
at 
main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala)


Here is my Testprojekt.sbt file:

name := Testprojekt

version := 1.0

scalaVersion := 2.10.4

libraryDependencies ++= {
  Seq(
org.apache.lucene % lucene-core % 4.9.0,
org.apache.lucene % lucene-analyzers-common % 4.9.0,
org.apache.lucene % lucene-queryparser % 4.9.0,
(org.apache.spark %% spark-core % 1.0.2).
exclude(org.mortbay.jetty, servlet-api).
exclude(commons-beanutils, commons-beanutils-core).
exclude(commons-collections, commons-collections).
exclude(commons-collections, commons-collections).
exclude(com.esotericsoftware.minlog, minlog).
exclude(org.eclipse.jetty.orbit, javax.mail.glassfish).
exclude(org.eclipse.jetty.orbit, javax.transaction).
exclude(org.eclipse.jetty.orbit, javax.servlet)
  )
}

resolvers += Akka Repository at http://repo.akka.io/releases/;







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dependency Problem with Spark / ScalaTest / SBT

2014-09-10 Thread arthur.hk.c...@gmail.com
Hi, 

What is your SBT command and the parameters?

Arthur


On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler sp...@tbonline.de wrote:

 Hello,
 
 I am writing a Spark App which is already working so far.
 Now I started to build also some UnitTests, but I am running into some 
 dependecy problems and I cannot find a solution right now. Perhaps someone 
 could help me.
 
 I build my Spark Project with SBT and it seems to be configured well, because 
 compiling, assembling and running the built jar with spark-submit are working 
 well.
 
 Now I started with the UnitTests, which I located under /src/test/scala.
 
 When I call test in sbt, I get the following:
 
 14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager
 14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server
 [trace] Stack trace suppressed: run last test:test for the full output.
 [error] Could not run test test.scala.SetSuite: 
 java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
 [info] Run completed in 626 milliseconds.
 [info] Total number of tests run: 0
 [info] Suites: completed 0, aborted 0
 [info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
 [info] All tests passed.
 [error] Error during tests:
 [error] test.scala.SetSuite
 [error] (test:test) sbt.TestsFailedException: Tests unsuccessful
 [error] Total time: 3 s, completed 10.09.2014 12:22:06
 
 last test:test gives me the following:
 
  last test:test
 [debug] Running TaskDef(test.scala.SetSuite, 
 org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector])
 java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at test.scala.SetSuite.init(SparkTest.scala:16)
 
 I also noticed right now, that sbt run is also not working:
 
 14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server
 [error] (run-main-2) java.lang.NoClassDefFoundError: 
 javax/servlet/http/HttpServletResponse
 java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at 
 main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29)
at main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala)
 
 Here is my Testprojekt.sbt file:
 
 name := Testprojekt
 
 version := 1.0
 
 scalaVersion := 2.10.4
 
 libraryDependencies ++= {
  Seq(
org.apache.lucene % lucene-core % 4.9.0,
org.apache.lucene % lucene-analyzers-common % 4.9.0,
org.apache.lucene % lucene-queryparser % 4.9.0,
(org.apache.spark %% spark-core % 1.0.2).
exclude(org.mortbay.jetty, servlet-api).
exclude(commons-beanutils, commons-beanutils-core).
exclude(commons-collections, commons-collections).
exclude(commons-collections, commons-collections).
exclude(com.esotericsoftware.minlog, minlog).
exclude(org.eclipse.jetty.orbit, javax.mail.glassfish).
exclude(org.eclipse.jetty.orbit, javax.transaction).
exclude(org.eclipse.jetty.orbit, javax.servlet)
  )
 }
 
 resolvers += Akka Repository at http://repo.akka.io/releases/;
 
 
 
 
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dependency Problem with Spark / ScalaTest / SBT

2014-09-10 Thread Thorsten Bergler

Hi,

I just called:

 test

or

 run

Thorsten


Am 10.09.2014 um 13:38 schrieb arthur.hk.c...@gmail.com:

Hi,

What is your SBT command and the parameters?

Arthur


On 10 Sep, 2014, at 6:46 pm, Thorsten Bergler sp...@tbonline.de wrote:


Hello,

I am writing a Spark App which is already working so far.
Now I started to build also some UnitTests, but I am running into some 
dependecy problems and I cannot find a solution right now. Perhaps someone 
could help me.

I build my Spark Project with SBT and it seems to be configured well, because 
compiling, assembling and running the built jar with spark-submit are working 
well.

Now I started with the UnitTests, which I located under /src/test/scala.

When I call test in sbt, I get the following:

14/09/10 12:22:06 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/10 12:22:06 INFO spark.HttpServer: Starting HTTP Server
[trace] Stack trace suppressed: run last test:test for the full output.
[error] Could not run test test.scala.SetSuite: java.lang.NoClassDefFoundError: 
javax/servlet/http/HttpServletResponse
[info] Run completed in 626 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[error] Error during tests:
[error] test.scala.SetSuite
[error] (test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 3 s, completed 10.09.2014 12:22:06

last test:test gives me the following:


last test:test

[debug] Running TaskDef(test.scala.SetSuite, 
org.scalatest.tools.Framework$$anon$1@6e5626c8, false, [SuiteSelector])
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at test.scala.SetSuite.init(SparkTest.scala:16)

I also noticed right now, that sbt run is also not working:

14/09/10 12:44:46 INFO spark.HttpServer: Starting HTTP Server
[error] (run-main-2) java.lang.NoClassDefFoundError: 
javax/servlet/http/HttpServletResponse
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
at org.apache.spark.HttpServer.start(HttpServer.scala:54)
at 
org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
at 
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at 
main.scala.PartialDuplicateScanner$.main(PartialDuplicateScanner.scala:29)
at main.scala.PartialDuplicateScanner.main(PartialDuplicateScanner.scala)

Here is my Testprojekt.sbt file:

name := Testprojekt

version := 1.0

scalaVersion := 2.10.4

libraryDependencies ++= {
  Seq(
org.apache.lucene % lucene-core % 4.9.0,
org.apache.lucene % lucene-analyzers-common % 4.9.0,
org.apache.lucene % lucene-queryparser % 4.9.0,
(org.apache.spark %% spark-core % 1.0.2).
exclude(org.mortbay.jetty, servlet-api).
exclude(commons-beanutils, commons-beanutils-core).
exclude(commons-collections, commons-collections).
exclude(commons-collections, commons-collections).
exclude(com.esotericsoftware.minlog, minlog).
exclude(org.eclipse.jetty.orbit, javax.mail.glassfish).
exclude(org.eclipse.jetty.orbit, javax.transaction).
exclude(org.eclipse.jetty.orbit, javax.servlet)
  )
}

resolvers += Akka Repository at http://repo.akka.io/releases/;







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-10 Thread redocpot
Ah, thank you. I did not notice that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



nested rdd operation

2014-09-10 Thread Pavlos Katsogridakis

Hi ,

I have a question on spark
this programm on spark-shell

val filerdd = sc.textFile(NOTICE,2)
val maprdd = filerdd.map( word = filerdd.map( word2 = (word2+word)  ) )
maprdd.collect()

throws NULL pointer exception ,
can somebody explain why i cannot have a nested rdd operation ?

--pavlos

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



JavaPairRDDString, Integer to JavaPairRDDString, String based on key

2014-09-10 Thread Tom
Is it possible to generate a JavaPairRDDString, Integer from a
JavaPairRDDString, String, where I can also use the key values? I have
looked at for instance mapToPair, but this generates a new K/V pair based on
the original value, and does not give me information about the key.

I need this in the initialization phase, where I have two RDD's with similar
keys, but with different types of values. Generating these is computational
intensive, and if I could use the first list to generate the second, it
would save me a big map/reduce phase.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaPairRDD-String-Integer-to-JavaPairRDD-String-String-based-on-key-tp13875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL -- more than two tables for join

2014-09-10 Thread arunshell87

Hi,

I too had tried SQL queries with joins, MINUS , subqueries etc but they did
not work in Spark Sql. 

I did not find any documentation on what queries work and what do not work
in Spark SQL, may be we have to wait for the Spark book to be released in
Feb-2015.

I believe you can try HiveQL in Spark for your requirement.

Thanks,
Arun



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p13877.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: nested rdd operation

2014-09-10 Thread Sean Owen
You can't use an RDD inside an operation on an RDD. Here you have
filerdd in your map function. It sort of looks like you want a
cartesian product of the RDD with itself, so look at the cartesian()
method. It may not be a good idea to compute such a thing.

On Wed, Sep 10, 2014 at 1:57 PM, Pavlos Katsogridakis
kats...@ics.forth.gr wrote:
 Hi ,

 I have a question on spark
 this programm on spark-shell

 val filerdd = sc.textFile(NOTICE,2)
 val maprdd = filerdd.map( word = filerdd.map( word2 = (word2+word)  ) )
 maprdd.collect()

 throws NULL pointer exception ,
 can somebody explain why i cannot have a nested rdd operation ?

 --pavlos

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-10 Thread Ye Xianjin
|  Do the two mailing lists share messages ?
I don't think so.  I didn't receive this message from the user list. I am not 
in databricks, so I can't answer your other questions. Maybe Davies Liu 
dav...@databricks.com can answer you?

-- 
Ye Xianjin
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote:

 Hi, Xianjin
 
 I checked user@spark.apache.org (mailto:user@spark.apache.org), and found my 
 post there:
 http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser
 
 I am using nabble to send this mail, which indicates that the mail will be
 sent from my email address to the u...@spark.incubator.apache.org 
 (mailto:u...@spark.incubator.apache.org) mailing
 list.
 
 Do the two mailing lists share messages ?
 
 Do we have a nabble interface for user@spark.apache.org 
 (mailto:user@spark.apache.org) mail list ?
 
 Thank you.
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 (http://Nabble.com).
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




Re: JavaPairRDDString, Integer to JavaPairRDDString, String based on key

2014-09-10 Thread Sean Owen
So, each key-value pair gets a new value for the original key? you
want mapValues().

On Wed, Sep 10, 2014 at 2:01 PM, Tom thubregt...@gmail.com wrote:
 Is it possible to generate a JavaPairRDDString, Integer from a
 JavaPairRDDString, String, where I can also use the key values? I have
 looked at for instance mapToPair, but this generates a new K/V pair based on
 the original value, and does not give me information about the key.

 I need this in the initialization phase, where I have two RDD's with similar
 keys, but with different types of values. Generating these is computational
 intensive, and if I could use the first list to generate the second, it
 would save me a big map/reduce phase.

 Thanks!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/JavaPairRDD-String-Integer-to-JavaPairRDD-String-String-based-on-key-tp13875.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Global Variables in Spark Streaming

2014-09-10 Thread Ravi Sharma
Hi Friends,

I'm using spark streaming as for kafka consumer. I want to do the CEP by
spark. So as for that I need to store my sequence of events. so that I cant
detect some pattern.

My question is How can I save my events in java collection temporary , So
that i can detect pattern by *processed(temporary stored) and upcoming
events.*


Cheers,
Ravi Sharma


Some techniques for improving application performance

2014-09-10 Thread Will Benton
Spark friends,

I recently wrote up a blog post with examples of some of the standard 
techniques for improving Spark application performance:

  
http://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

The idea is that we start with readable but poorly-performing code and 
iteratively refine it, looking at the performance and operational consequences 
of a series of simple changes.  My intention is that this would provide some 
context for folks who are new to Spark and looking to improve their prototype 
applications.

If you have additional suggestions for techniques to address, let me know!  I'd 
be happy to write a follow-up post.



best,
wb

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL -- more than two tables for join

2014-09-10 Thread arthur.hk.c...@gmail.com
Hi,

May be you can take a look about the following.

http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html

Good luck.
Arthur

On 10 Sep, 2014, at 9:09 pm, arunshell87 shell.a...@gmail.com wrote:

 
 Hi,
 
 I too had tried SQL queries with joins, MINUS , subqueries etc but they did
 not work in Spark Sql. 
 
 I did not find any documentation on what queries work and what do not work
 in Spark SQL, may be we have to wait for the Spark book to be released in
 Feb-2015.
 
 I believe you can try HiveQL in Spark for your requirement.
 
 Thanks,
 Arun
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-more-than-two-tables-for-join-tp13865p13877.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Task not serializable

2014-09-10 Thread Sean Owen
You mention that you are creating a UserGroupInformation inside your
function, but something is still serializing it. You should show your
code since it may not be doing what you think.

If you instantiate an object, it happens every time your function is
called. map() is called once per data element; mapPartitions() once
per partition. It depends.

On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra
sarathchandra.jos...@algofusiontech.com wrote:
 Hi Sean,

 The solution of instantiating the non-serializable class inside the map is
 working fine, but I hit a road block. The solution is not working for
 singleton classes like UserGroupInformation.

 In my logic as part of processing a HDFS file, I need to refer to some
 reference files which are again available in HDFS. So inside the map method
 I'm trying to instantiate UserGroupInformation to get an instance of
 FileSystem. Then using this FileSystem instance I read those reference files
 and use that data in my processing logic.

 This is throwing task not serializable exceptions for 'UserGroupInformation'
 and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of
 'UserGroupInformation'. But it didn't resolve the issue.

 Request you provide some pointers in this regard.

 Also I have a query - when we instantiate a class inside map method, does it
 create a new instance for every RDD it is processing?

 Thanks  Regards,
 Sarath

 On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote:

 I disagree that the generally right change is to try to make the
 classes serializable. Usually, classes that are not serializable are
 not supposed to be serialized. You're using them in a way that's
 causing them to be serialized, and that's probably not desired.

 For example, this is wrong:

 val foo: SomeUnserializableManagerClass = ...
 rdd.map(d = foo.bar(d))

 This is right:

 rdd.map { d =
   val foo: SomeUnserializableManagerClass = ...
   foo.bar(d)
 }

 In the first instance, you create the object on the driver and try to
 serialize and copy it to workers. In the second, you're creating
 SomeUnserializableManagerClass in the function and therefore on the
 worker.

 mapPartitions is better if this creation is expensive.

 On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra
 sarathchandra.jos...@algofusiontech.com wrote:
  Hi,
 
  I'm trying to migrate a map-reduce program to work with spark. I
  migrated
  the program from Java to Scala. The map-reduce program basically loads a
  HDFS file and for each line in the file it applies several
  transformation
  functions available in various external libraries.
 
  When I execute this over spark, it is throwing me Task not
  serializable
  exceptions for each and every class being used from these from external
  libraries. I included serialization to few classes which are in my
  scope,
  but there there are several other classes which are out of my scope like
  org.apache.hadoop.io.Text.
 
  How to overcome these exceptions?
 
  ~Sarath.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark NLP

2014-09-10 Thread Paolo Platter
Hi all,

What is your preferred scala NLP lib ? why ?
Is there any items on the spark’s road map to integrate NLP features ?

I basically need to perform NER line by line, so I don’t need a deep 
integration with the distributed engine.
I only want simple dependencies and the chance to build a dictionary for 
italian Language.

Any suggestions ?

Thanks

Paolo Platter



Re: Task not serializable

2014-09-10 Thread Sarath Chandra
Thanks Sean.
Please find attached my code. Let me know your suggestions/ideas.

Regards,

*Sarath*

On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen so...@cloudera.com wrote:

 You mention that you are creating a UserGroupInformation inside your
 function, but something is still serializing it. You should show your
 code since it may not be doing what you think.

 If you instantiate an object, it happens every time your function is
 called. map() is called once per data element; mapPartitions() once
 per partition. It depends.

 On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra
 sarathchandra.jos...@algofusiontech.com wrote:
  Hi Sean,
 
  The solution of instantiating the non-serializable class inside the map
 is
  working fine, but I hit a road block. The solution is not working for
  singleton classes like UserGroupInformation.
 
  In my logic as part of processing a HDFS file, I need to refer to some
  reference files which are again available in HDFS. So inside the map
 method
  I'm trying to instantiate UserGroupInformation to get an instance of
  FileSystem. Then using this FileSystem instance I read those reference
 files
  and use that data in my processing logic.
 
  This is throwing task not serializable exceptions for
 'UserGroupInformation'
  and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of
  'UserGroupInformation'. But it didn't resolve the issue.
 
  Request you provide some pointers in this regard.
 
  Also I have a query - when we instantiate a class inside map method,
 does it
  create a new instance for every RDD it is processing?
 
  Thanks  Regards,
  Sarath
 
  On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote:
 
  I disagree that the generally right change is to try to make the
  classes serializable. Usually, classes that are not serializable are
  not supposed to be serialized. You're using them in a way that's
  causing them to be serialized, and that's probably not desired.
 
  For example, this is wrong:
 
  val foo: SomeUnserializableManagerClass = ...
  rdd.map(d = foo.bar(d))
 
  This is right:
 
  rdd.map { d =
val foo: SomeUnserializableManagerClass = ...
foo.bar(d)
  }
 
  In the first instance, you create the object on the driver and try to
  serialize and copy it to workers. In the second, you're creating
  SomeUnserializableManagerClass in the function and therefore on the
  worker.
 
  mapPartitions is better if this creation is expensive.
 
  On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra
  sarathchandra.jos...@algofusiontech.com wrote:
   Hi,
  
   I'm trying to migrate a map-reduce program to work with spark. I
   migrated
   the program from Java to Scala. The map-reduce program basically
 loads a
   HDFS file and for each line in the file it applies several
   transformation
   functions available in various external libraries.
  
   When I execute this over spark, it is throwing me Task not
   serializable
   exceptions for each and every class being used from these from
 external
   libraries. I included serialization to few classes which are in my
   scope,
   but there there are several other classes which are out of my scope
 like
   org.apache.hadoop.io.Text.
  
   How to overcome these exceptions?
  
   ~Sarath.
 
 

class MapperNew extends Serializable {

  var hadoopConf: Configuration = _;
  var recordValidator: RecordValidator = _;
  var rulesLoader: RulesLoader = _;
  var recordTransformer: TransFilEngine = _;
  var ach: ConfigurationHadoop = _;
  var file: RDD[(Long, String)] = _;

  // Configuration is passed by caller
  def run(c: Configuration): Unit = {

hadoopConf = c;

val sparkConf = new SparkConf()
  .setMaster(hadoopConf.get(sparkMaster))
  .setAppName(NewMapper)
  .setSparkHome(hadoopConf.get(sparkHome))
  .setJars(Seq(rbcommon.jar,
rbengine.jar));
val sparkContext = new SparkContext(sparkConf);
val util = SparkHadoopUtil.get;

util.runAsSparkUser(() = {
  file = sparkContext.newAPIHadoopFile(hadoopConf.get(inputPath),
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text], hadoopConf).map(r = (r._1.get(), r._2.toString()));
});

// Works fine till this line
println(File line count =  + file.count);

val rulesFilePath = hdfs://slave:54310/user/hduser/ + 		// Without this spark is trying to read from local file system
	hadoopConf.get(rulesFilePath) + / + 
	hadoopConf.get(rulesFile);

var processed = file.map(line = {
  // Doesn't work throws 'task not serializable' exception for UserGroupInformation
  val ugi = UserGroupInformation.createRemoteUser(hadoopConf.get(remoteUser));
  // Doesn't work throws 'task not serializable' exception for FileSystem
  val fs = ugi.doAs(new PrivilegedExceptionAction[FileSystem] {
def run(): FileSystem = {
  FileSystem.get(hadoopConf);
}
  });
  // RulesLoader and TransFilEngine are 

Re: flattening a list in spark sql

2014-09-10 Thread gtinside
Hi ,

Thanks it worked, really appreciate your help. I have also been trying to do
multiple Lateral Views, but it doesn't seem to be working. 

Query :
hiveContext.sql(Select t2 from fav LATERAL VIEW explode(TABS) tabs1 as t1
LATERAL VIEW explode(t1) tabs2 as t2)

Exception
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 't2, tree:

Regards,
Gaurav






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark and cassandra

2014-09-10 Thread Oleg Ruchovets
Hi ,
  I try to evaluate different option of spark + cassandra and I have couple
of additional questions.
  My aim is to use cassandra only without hadoop:
  1) Is it possible to use only cassandra as input/output parameter for
PySpark?
  2) In case I'll use Spark (java,scala) is it possible to use only
cassandra - input/output without hadoop?
  3) I know there are couple of strategies for storage level, in case my
data set is quite big and I have no enough memory to process - can I use
DISK_ONLY option without hadoop (having only cassandra)?

Thanks
Oleg

On Wed, Sep 3, 2014 at 3:08 AM, Kan Zhang kzh...@apache.org wrote:

 In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs. See
 examples/src/main/python/cassandra_inputformat.py for an example. You may
 need to write your own key/value converters.


 On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi All ,
Is it possible to have cassandra as input data for PySpark. I found
 example for java -
 http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and
 I am looking something similar for python.

 Thanks
 Oleg.





Cassandra connector

2014-09-10 Thread wwilkins
Hi,
I am having difficulty getting the Cassandra connector running within the spark 
shell.

My jars looks like:
[wwilkins@phel-spark-001 logs]$ ls -altr /opt/connector/
total 14588
drwxr-xr-x. 5 root root4096 Sep  9 22:15 ..
-rw-r--r--  1 root root  242809 Sep  9 22:20 
spark-cassandra-connector-master.zip
-rw-r--r--  1 root root  541332 Sep  9 22:20 cassandra-driver-core-2.0.3.jar
-rw-r--r--  1 root root 1855685 Sep  9 22:20 cassandra-thrift-2.0.9.jar
-rw-r--r--  1 root root   30085 Sep  9 22:20 commons-codec-1.2.jar
-rw-r--r--  1 root root  315805 Sep  9 22:20 commons-lang3-3.1.jar
-rw-r--r--  1 root root   60686 Sep  9 22:20 commons-logging-1.1.1.jar
-rw-r--r--  1 root root 2228009 Sep  9 22:20 guava-16.0.1.jar
-rw-r--r--  1 root root  433368 Sep  9 22:20 httpclient-4.2.5.jar
-rw-r--r--  1 root root  227275 Sep  9 22:20 httpcore-4.2.4.jar
-rw-r--r--  1 root root 1222059 Sep  9 22:20 ivy-2.3.0.jar.bak
-rw-r--r--  1 root root   38460 Sep  9 22:20 joda-convert-1.2.jar.bak
-rw-r--r--  1 root root   98818 Sep  9 22:20 joda-convert-1.6.jar
-rw-r--r--  1 root root  581571 Sep  9 22:20 joda-time-2.3.jar
-rw-r--r--  1 root root  217053 Sep  9 22:20 libthrift-0.9.1.jar
-rw-r--r--  1 root root 618 Sep  9 22:20 log4j.properties
-rw-r--r--  1 root root  165505 Sep  9 22:20 lz4-1.2.0.jar
-rw-r--r--  1 root root   85448 Sep  9 22:20 metrics-core-3.0.2.jar
-rw-r--r--  1 root root 1231993 Sep  9 22:20 netty-3.9.0.Final.jar
-rw-r--r--  1 root root   26083 Sep  9 22:20 slf4j-api-1.7.2.jar.bak
-rw-r--r--  1 root root   26084 Sep  9 22:20 slf4j-api-1.7.5.jar
-rw-r--r--  1 root root 1251514 Sep  9 22:20 snappy-java-1.0.5.jar
-rw-r--r--  1 root root  776782 Sep  9 22:20 
spark-cassandra-connector_2.10-1.0.0-beta1.jar
-rw-r--r--  1 root root  997458 Sep  9 22:20 
spark-cassandra-connector_2.10-1.0.0-SNAPSHOT.jar.bak3
-rwxr--r--  1 root root 1113208 Sep  9 22:20 
spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar.bak
-rw-r--r--  1 root root 804 Sep  9 22:20 
spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar.bak2


I launch the shell with this command:
/data/spark/bin/spark-shell --driver-class-path $(echo /opt/connector/*.jar 
|sed 's/ /:/g')


I run these commands:
sc.stop
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

val conf = new SparkConf()
conf.set(spark.cassandra.connection.host, 10.208.59.164)
val sc = new SparkContext(conf)
val table = sc.cassandraTable(retail, ordercf)


And I get this problem:
scala val table = sc.cassandraTable(retail, ordercf)
java.lang.AbstractMethodError
at org.apache.spark.Logging$class.log(Logging.scala:52)
at 
com.datastax.spark.connector.cql.CassandraConnector$.log(CassandraConnector.scala:145)
at org.apache.spark.Logging$class.logDebug(Logging.scala:63)
at 
com.datastax.spark.connector.cql.CassandraConnector$.logDebug(CassandraConnector.scala:145)
at 
com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createCluster(CassandraConnector.scala:155)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:152)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:152)
at 
com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at 
com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at 
com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:72)
at 
com.datastax.spark.connector.cql.Schema.keyspaces$lzycompute(Schema.scala:111)
at com.datastax.spark.connector.cql.Schema.keyspaces(Schema.scala:110)
at 
com.datastax.spark.connector.cql.Schema.tables$lzycompute(Schema.scala:123)
at com.datastax.spark.connector.cql.Schema.tables(Schema.scala:122)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:196)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:195)
at 
com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:202)
at 
com.datastax.spark.connector.package$SparkContextFunctions.cassandraTable(package.scala:94)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)
at $iwC$$iwC$$iwC$$iwC.init(console:33)
at $iwC$$iwC$$iwC.init(console:35)
at $iwC$$iwC.init(console:37)
at $iwC.init(console:39)
at init(console:41)
at .init(console:45)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at 

Re: Cassandra connector

2014-09-10 Thread gtinside
Are you using spark 1.1 ? If yes you would have to update the datastax
cassandra connector code and remove ref to log methods from
CassandraConnector.scala

Regards,
Gaurav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-connector-tp13896p13897.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-10 Thread Dibyendu Bhattacharya
Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to Store ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only storage.BlockManagerInfo: Added input... type
messages, but hardly I see scheduler.TaskSetManager: Starting task...
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Tathagata,

 I have managed to implement the logic into the Kafka-Spark consumer to
 recover from Driver failure. This is just a interim fix till actual fix is
 done from Spark side.

 The logic is something like this.

 1. When the Individual Receivers starts for every Topic partition, it
 writes the Kafka messages along with certain meta data in Block Store. This
 meta data contains the details of message offset, partition id, topic name
 and consumer id. You can see this logic in PartitionManager.java  next()
 method.

 2.  In the Driver code ( Consumer.java) , I am creating the union of all
 there individual D-Streams, and processing the data using forEachRDD call.
 In the driver code, I am receiving the RDD which contains the Kafka
 messages along with meta data details. In the driver code, periodically I
 am committing the processed offset of the Kafka message into ZK.

 3. When driver stops, and restart again, the Receiver starts again, and
 this time in PartitionManager.java, I am checking what is the actual
 committed offset for the partition, and what is the actual processed
 offset of the same partition. This logic is in the PartitionManager
 constructor.

 If this is a Receiver restart, and processed offset of less than
 Committed offset, I am started fetching again from Processed offset.
 This may lead to duplicate records, but our system can handle duplicates.

 I have tested with multiple driver kill/stops and I found no data loss in
 Kafka consumer.

 In the Driver code, I have not done any checkpointing yet, will test
 that tomorrow.


 One interesting thing I found, if I do repartition of original stream ,
 I can still see the issue of data loss in this logic. What I believe,
 during re- partitioning Spark might be changing the order of RDDs the way
 it generated from Kafka stream. So during re-partition case, even when I am
 committing processed offset, but as this is not in order I still see issue.
 Not sure if this understanding is correct, but not able to find any other
 explanation.

 But if I do not use repartition this solution works fine.

 I can make this as configurable, so that when actual fix is available ,
 this feature in consumer can be turned off as this is an overhead for the
 consumer . Let me know what you think..

 Regards,
 Dibyendu




 On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Some thoughts on this thread to clarify the doubts.

 1. Driver recovery: The current (1.1 to be released) does not recover the
 raw data that has been received but not processes. This is because when the
 driver dies, the executors die and so does the raw data that was stored in
 it. Only for HDFS, the data is not lost by driver recovery as the data is
 already present reliably in HDFS. This is something we want to fix by Spark
 1.2 (3 month from now). Regarding recovery by replaying the data from
 Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
 exactly-once semantics in all transformations. To guarantee this for all
 kinds of streaming computations stateful and not-stateful computations, it
 is requires that the data be replayed through Kafka in exactly same order,
 and the underlying blocks of data in Spark be regenerated in the exact way
 as it would have if there was no driver failure. This is quite tricky to
 implement, requires manipulation of zookeeper offsets, etc, that is hard to
 do with the high level consumer that KafkaUtil uses. Dibyendu's low level
 Kafka receiver may enable such approaches in the future. For now we
 definitely plan to solve the first problem very very soon.

 3. 

Re: groupBy gives non deterministic results

2014-09-10 Thread Davies Liu
I think the mails to spark.incubator.apache.org will be forwarded to
spark.apache.org.

Here is the header of the first mail:

from: redocpot julien19890...@gmail.com
to: u...@spark.incubator.apache.org
date: Mon, Sep 8, 2014 at 7:29 AM
subject: groupBy gives non deterministic results
mailing list: user.spark.apache.org Filter messages from this mailing list
mailed-by: spark.apache.org

I only subscribe spark.apache.org, and I do see all the mails from he.

On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com wrote:
 |  Do the two mailing lists share messages ?
 I don't think so.  I didn't receive this message from the user list. I am
 not in databricks, so I can't answer your other questions. Maybe Davies Liu
 dav...@databricks.com can answer you?

 --
 Ye Xianjin
 Sent with Sparrow

 On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote:

 Hi, Xianjin

 I checked user@spark.apache.org, and found my post there:
 http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser

 I am using nabble to send this mail, which indicates that the mail will be
 sent from my email address to the u...@spark.incubator.apache.org mailing
 list.

 Do the two mailing lists share messages ?

 Do we have a nabble interface for user@spark.apache.org mail list ?

 Thank you.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Global Variables in Spark Streaming

2014-09-10 Thread Akhil Das
Yes your understanding is correct. In that case one easiest option would be
to Serialize the object and dump it somewhere in hdfs so that you will be
able to recreate/update the object from the file.

We have something similar which you can find over BroadCastServer
https://github.com/sigmoidanalytics/spork/blob/spork-0.9/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastServer.java
and BroadCastClient
https://github.com/sigmoidanalytics/spork/blob/spork-0.9/src/org/apache/pig/backend/hadoop/executionengine/spark/BroadCastClient.java
which we use internally to pass/update Objects between master and worker
nodes.

Thanks
Best Regards

On Wed, Sep 10, 2014 at 7:50 PM, Ravi Sharma raviprincesha...@gmail.com
wrote:

 Akhil, By using broadcast variable Will I be able to change the values of
 Broadcast variable?
 As per my understanding It will create final variable to access the value
 across the cluster.

 Please correct me if I'm wrong.

 Thanks,

 Cheers,
 Ravi Sharma

 On Wed, Sep 10, 2014 at 7:31 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Have a look at Broadcasting variables
 http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


 Thanks
 Best Regards

 On Wed, Sep 10, 2014 at 7:25 PM, Ravi Sharma raviprincesha...@gmail.com
 wrote:

 Hi Friends,

 I'm using spark streaming as for kafka consumer. I want to do the CEP by
 spark. So as for that I need to store my sequence of events. so that I cant
 detect some pattern.

 My question is How can I save my events in java collection temporary ,
 So that i can detect pattern by *processed(temporary stored) and
 upcoming events.*


 Cheers,
 Ravi Sharma






Re: flattening a list in spark sql

2014-09-10 Thread gtinside
My bad, please ignore, it works !!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flattening-a-list-in-spark-sql-tp13300p13901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL -- more than two tables for join

2014-09-10 Thread Michael Armbrust
What version of Spark SQL are you running here?  I think a lot of your
concerns have likely been addressed in more recent versions of the code /
documentation.  (Spark 1.1 should be published in the next few days)

In particular, for serious applications you should use a HiveContext and
HiveQL as this is a much more complete implementation of a SQL Parser.  The
one in SQL context is only suggested if the Hive dependencies conflict with
your application.


 1)  spark sql does not support multiple join


This is not true.  What problem were you running into?


 2)  spark left join: has performance issue


Can you describe your data and query more?


 3)  spark sql’s cache table: does not support two-tier query


I'm not sure what you mean here.


 4)  spark sql does not support repartition


You can repartition SchemaRDDs in the same way as normal RDDs.


Re: Spark HiveQL support plan

2014-09-10 Thread Michael Armbrust
HiveQL is the default language for the JDBC server which will be available
as part of the 1.1 release (coming very soon!). Adding support for calling
MLlib and other spark libraries is on the roadmap, but not possible at this
moment.

On Tue, Sep 9, 2014 at 1:45 PM, XUE, Xiaohui xiaohui@sap.com wrote:


  Hi,

  In Spark website, there’s a plan to support HiveQL on top of Spark SQL
 and also to support JDBC/ODBC.

  I would like to know if in this “HiveQL” supported by Spark (or Spark
 SQL accessible through JDBC/ODBC), is there a plan to add extensions to
 leverage different Spark features like machine learning and stream?
 I’m asking this as we’re considering using Spark from remote machine, so
 this would be easier through JDBC/ODBC. But it’ll be good to also take
 benefice of other Spark features than the ones exist in HiveQL.

  Thanks,
 Xiaohui Xue



RE: Cassandra connector

2014-09-10 Thread Wade Wilkins
Thank you!

I am running spark 1.0 but, your suggestion worked for me.
I rem'ed out all 
//logDebug
In both
CassandraConnector.scala
and
Schema.scala

I am moving again. 

Regards,
Wade

-Original Message-
From: gtinside [mailto:gtins...@gmail.com] 
Sent: Wednesday, September 10, 2014 8:49 AM
To: u...@spark.incubator.apache.org
Subject: Re: Cassandra connector

Are you using spark 1.1 ? If yes you would have to update the datastax 
cassandra connector code and remove ref to log methods from 
CassandraConnector.scala

Regards,
Gaurav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-connector-tp13896p13897.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to run python examples in spark 1.1?

2014-09-10 Thread freedafeng
Just want to provide more information on how I ran the examples.

Environment: Cloudera quick start Vm 5.1.0 (HBase 0.98.1 installed). I
created a table called 'data1', and 'put' two records in it. I can see the
table and data are fine in hbase shell. 

I cloned spark repo and checked out to 1.1 branch, built it by running
sbt/sbt assembly/assembly
sbt/sbt examples/assembly.

The script is basically,

if __name__ == __main__:

conf = SparkConf().setAppName('testspark_similar_users_v2')

sc = SparkContext(conf=conf, batchSize=512)

conf2 = {hbase.zookeeper.quorum: localhost,
hbase.mapreduce.inputtable: 'data1'}
hbase_rdd = sc.newAPIHadoopRDD(
org.apache.hadoop.hbase.mapreduce.TableInputFormat,
org.apache.hadoop.hbase.io.ImmutableBytesWritable,
org.apache.hadoop.hbase.client.Result,
   
keyConverter=org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter,
   
valueConverter=org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter,
conf=conf2)
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

sc.stop()

The error message is,

14/09/10 09:41:52 INFO ZooKeeper: Initiating client connection,
connectString=localhost:2181 sessionTimeout=18 watcher=hconnection
14/09/10 09:41:52 INFO RecoverableZooKeeper: The identifier of this process
is 25963@quickstart.cloudera
14/09/10 09:41:52 INFO ClientCnxn: Opening socket connection to server
quickstart.cloudera/127.0.0.1:2181. Will not attempt to authenticate using
SASL (unknown error)
14/09/10 09:41:52 INFO ClientCnxn: Socket connection established to
quickstart.cloudera/127.0.0.1:2181, initiating session
14/09/10 09:41:52 INFO ClientCnxn: Session establishment complete on server
quickstart.cloudera/127.0.0.1:2181, sessionid = 0x1485b365c450016,
negotiated timeout = 4
14/09/10 09:52:32 ERROR TableInputFormat:
org.apache.hadoop.hbase.client.NoServerForRegionException: Unable to find
region for data1,,99 after 10 tries.
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:980)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:885)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:987)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:889)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:846)
at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:234)
at org.apache.hadoop.hbase.client.HTable.init(HTable.java:174)
at org.apache.hadoop.hbase.client.HTable.init(HTable.java:133)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:96)
at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
at org.apache.spark.rdd.RDD.first(RDD.scala:1091)
at
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:70)
at
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:441)
at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Traceback (most recent call last):
  File /home/cloudera/workspace/recom_env_testing/bin/sparkhbasecheck.py,
line 71, in module

Re: Global Variables in Spark Streaming

2014-09-10 Thread Santiago Mola
Hi Ravi,

2014-09-10 15:55 GMT+02:00 Ravi Sharma raviprincesha...@gmail.com:


 I'm using spark streaming as for kafka consumer. I want to do the CEP by
spark. So as for that I need to store my sequence of events. so that I cant
detect some pattern.


Depending on what you're trying to accomplish, you might implement this
using Spark Streaming only, by using the updateStateByKey transformation.
[1]

This will allow you to maintain global states that you can combine with
other streaming operations. We have successfully used this approach to
detect patterns in log sequences with Spark Streaming.

[1]
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

Best,
-- 
Santiago M. Mola
sm...@stratio.com


Se8sIang i Atau zorOi

2014-09-10 Thread Edgar Vega
jarAk ifz ais iviiiaiiiIsirseikejujuran8$-88Ω=-O Ω:-P  in iCdsiiisOz) :)
(isuii:V) (:V) riiie89θ


Re: groupBy gives non deterministic results

2014-09-10 Thread Ye Xianjin
Well, That's weird. I don't see this thread in my mail box as sending to user 
list. Maybe because I also subscribe the incubator mail list? I do see mails 
sending to incubator mail list and no one replies. I thought it was because 
people don't subscribe the incubator now.

-- 
Ye Xianjin
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Thursday, September 11, 2014 at 12:12 AM, Davies Liu wrote:

 I think the mails to spark.incubator.apache.org 
 (http://spark.incubator.apache.org) will be forwarded to
 spark.apache.org (http://spark.apache.org).
 
 Here is the header of the first mail:
 
 from: redocpot julien19890...@gmail.com (mailto:julien19890...@gmail.com)
 to: u...@spark.incubator.apache.org (mailto:u...@spark.incubator.apache.org)
 date: Mon, Sep 8, 2014 at 7:29 AM
 subject: groupBy gives non deterministic results
 mailing list: user.spark.apache.org (http://user.spark.apache.org) Filter 
 messages from this mailing list
 mailed-by: spark.apache.org (http://spark.apache.org)
 
 I only subscribe spark.apache.org (http://spark.apache.org), and I do see all 
 the mails from he.
 
 On Wed, Sep 10, 2014 at 6:29 AM, Ye Xianjin advance...@gmail.com 
 (mailto:advance...@gmail.com) wrote:
  | Do the two mailing lists share messages ?
  I don't think so. I didn't receive this message from the user list. I am
  not in databricks, so I can't answer your other questions. Maybe Davies Liu
  dav...@databricks.com (mailto:dav...@databricks.com) can answer you?
  
  --
  Ye Xianjin
  Sent with Sparrow
  
  On Wednesday, September 10, 2014 at 9:05 PM, redocpot wrote:
  
  Hi, Xianjin
  
  I checked user@spark.apache.org (mailto:user@spark.apache.org), and found 
  my post there:
  http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/browser
  
  I am using nabble to send this mail, which indicates that the mail will be
  sent from my email address to the u...@spark.incubator.apache.org 
  (mailto:u...@spark.incubator.apache.org) mailing
  list.
  
  Do the two mailing lists share messages ?
  
  Do we have a nabble interface for user@spark.apache.org 
  (mailto:user@spark.apache.org) mail list ?
  
  Thank you.
  
  
  
  
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13876.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com 
  (http://Nabble.com).
  
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
  (mailto:user-unsubscr...@spark.apache.org)
  For additional commands, e-mail: user-h...@spark.apache.org 
  (mailto:user-h...@spark.apache.org)
  
 
 
 




Re: Task not serializable

2014-09-10 Thread Marcelo Vanzin
You're using hadoopConf, a Configuration object, in your closure.
That type is not serializable.

You can use  -Dsun.io.serialization.extendedDebugInfo=true to debug
serialization issues.

On Wed, Sep 10, 2014 at 8:23 AM, Sarath Chandra
sarathchandra.jos...@algofusiontech.com wrote:
 Thanks Sean.
 Please find attached my code. Let me know your suggestions/ideas.

 Regards,
 Sarath

 On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen so...@cloudera.com wrote:

 You mention that you are creating a UserGroupInformation inside your
 function, but something is still serializing it. You should show your
 code since it may not be doing what you think.

 If you instantiate an object, it happens every time your function is
 called. map() is called once per data element; mapPartitions() once
 per partition. It depends.

 On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra
 sarathchandra.jos...@algofusiontech.com wrote:
  Hi Sean,
 
  The solution of instantiating the non-serializable class inside the map
  is
  working fine, but I hit a road block. The solution is not working for
  singleton classes like UserGroupInformation.
 
  In my logic as part of processing a HDFS file, I need to refer to some
  reference files which are again available in HDFS. So inside the map
  method
  I'm trying to instantiate UserGroupInformation to get an instance of
  FileSystem. Then using this FileSystem instance I read those reference
  files
  and use that data in my processing logic.
 
  This is throwing task not serializable exceptions for
  'UserGroupInformation'
  and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead
  of
  'UserGroupInformation'. But it didn't resolve the issue.
 
  Request you provide some pointers in this regard.
 
  Also I have a query - when we instantiate a class inside map method,
  does it
  create a new instance for every RDD it is processing?
 
  Thanks  Regards,
  Sarath
 
  On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen so...@cloudera.com wrote:
 
  I disagree that the generally right change is to try to make the
  classes serializable. Usually, classes that are not serializable are
  not supposed to be serialized. You're using them in a way that's
  causing them to be serialized, and that's probably not desired.
 
  For example, this is wrong:
 
  val foo: SomeUnserializableManagerClass = ...
  rdd.map(d = foo.bar(d))
 
  This is right:
 
  rdd.map { d =
val foo: SomeUnserializableManagerClass = ...
foo.bar(d)
  }
 
  In the first instance, you create the object on the driver and try to
  serialize and copy it to workers. In the second, you're creating
  SomeUnserializableManagerClass in the function and therefore on the
  worker.
 
  mapPartitions is better if this creation is expensive.
 
  On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra
  sarathchandra.jos...@algofusiontech.com wrote:
   Hi,
  
   I'm trying to migrate a map-reduce program to work with spark. I
   migrated
   the program from Java to Scala. The map-reduce program basically
   loads a
   HDFS file and for each line in the file it applies several
   transformation
   functions available in various external libraries.
  
   When I execute this over spark, it is throwing me Task not
   serializable
   exceptions for each and every class being used from these from
   external
   libraries. I included serialization to few classes which are in my
   scope,
   but there there are several other classes which are out of my scope
   like
   org.apache.hadoop.io.Text.
  
   How to overcome these exceptions?
  
   ~Sarath.
 
 




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Tim Smith
How are you creating your kafka streams in Spark?

If you have 10 partitions for a topic, you can call createStream ten
times to create 10 parallel receivers/executors and then use union to
combine all the dStreams.



On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote:

 Hi (my previous post as been used by someone else)

 I'm building a application the read from kafka stream event. In production
 we've 5 consumers that share 10 partitions.
 But on spark streaming kafka only 1 worker act as a consumer then
 distribute
 the tasks to workers so I can have only 1 machine acting as consumer but I
 need more because only 1 consumer means Lags.

 Do you've any idea what I can do ? Another point is interresting the master
 is not loaded at all I can get up more than 10 % CPU

 I've tried to increase the queued.max.message.chunks on the kafka client to
 read more records thinking it'll speed up the read but I only get

 ERROR consumer.ConsumerFetcherThread:

 [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:

 SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
 ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] -
 PartitionFetchInfo(929838589,1048576),[IA2,6] -
 PartitionFetchInfo(929515796,1048576),[IA2,9] -
 PartitionFetchInfo(929577946,1048576),[IA2,8] -
 PartitionFetchInfo(930751599,1048576),[IA2,2] -
 PartitionFetchInfo(926457704,1048576),[IA2,5] -
 PartitionFetchInfo(930774385,1048576),[IA2,0] -
 PartitionFetchInfo(929913213,1048576),[IA2,3] -
 PartitionFetchInfo(929268891,1048576),[IA2,4] -
 PartitionFetchInfo(929949877,1048576),[IA2,1] -
 PartitionFetchInfo(930063114,1048576)
 java.lang.OutOfMemoryError: Java heap space

 Is someone have ideas ?
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a jmap -histo on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by [B which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis





Re: how to choose right DStream batch interval

2014-09-10 Thread Tim Smith
http://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617

Slide 39 covers it.

On Tue, Sep 9, 2014 at 9:23 PM, qihong qc...@pivotal.io wrote:

 Hi Mayur,

 Thanks for your response. I did write a simple test that set up a DStream
 with
 5 batches; The batch duration is 1 second, and the 3rd batch will take
 extra
 2 seconds, the output of the test shows that the 3rd batch causes backlog,
 and spark streaming does catch up on 4th and 5th batch (DStream.print
 was modified to output system time)

 ---
 Time: 1409959708000 ms, system time: 1409959708269
 ---
 1155
 ---
 Time: 1409959709000 ms, system time: 1409959709033
 ---
 2255
 delay 2000 ms
 ---
 Time: 140995971 ms, system time: 1409959712036
 ---
 3355
 ---
 Time: 1409959711000 ms, system time: 1409959712059
 ---
 4455
 ---
 Time: 1409959712000 ms, system time: 1409959712083
 ---
 

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Yana Kadiyska
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken
 by [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis






Re: spark-streaming Could not compute split exception

2014-09-10 Thread Tim Smith
I had a similar issue and many others - all were basically symptoms for
yarn killing the container for high memory usage. Haven't gotten to root
cause yet.

On Tue, Sep 9, 2014 at 3:18 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Your executor is exiting or crashing unexpectedly:

 On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza
 pesp...@societyconsulting.com wrote:
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
  code from container container_1410224367331_0006_01_03 is : 1
  2014-09-09 21:47:26,345 WARN
  org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
  Exception from container-launch with container ID:
  container_1410224367331_0006_01_03 and exit code: 1

 You can check the app logs (yarn logs --applicationId [id]) and see
 why the container is exiting. There's probably an exception happening
 somewhere.


 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Hadoop Distributed Cache

2014-09-10 Thread Maximo Gurmendez
Hi,
  As part of SparkContext.newAPIHadoopRDD(). Would Spark support an InputFormat 
that uses Hadoop’s distributed cache?
Thanks,
   Máximo
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[spark upgrade] Error communicating with MapOutputTracker when running test cases in latest spark

2014-09-10 Thread Adrian Mocanu
I use
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
to help me with testing.

In spark 9.1 my tests depending on TestSuiteBase worked fine. As soon as I 
switched to latest (1.0.1) all tests fail. My sbt import is: org.apache.spark 
%% spark-core % 1.1.0-SNAPSHOT % provided

One exception I get is:
Error communicating with MapOutputTracker
org.apache.spark.SparkException: Error communicating with MapOutputTracker 

How can I fix this?

Found a thread on this error but not very helpful: 
http://mail-archives.apache.org/mod_mbox/spark-dev/201405.mbox/%3ctencent_6b37d69c54f76819509a5...@qq.com%3E

-Adrian



Re: pyspark and cassandra

2014-09-10 Thread Kan Zhang
Thanks for the clarification, Yadid. By Hadoop jobs, I meant Spark jobs
that use Hadoop inputformats (as shown in the cassandra_inputformat.py
 example).

A future possibility of accessing Cassandra from PySpark is when SparkSQL
supports Cassandra as a data source.

On Wed, Sep 10, 2014 at 11:37 AM, yadid ayzenberg ayz...@gmail.com wrote:


 You do not need to actually use Hadoop to read from cassandra. The hadoop
 inputformat is a standard way for hadoop jobs to read data from various
 sources. Spark can utilize input formats as well.
 The storage level has nothing to do with source of the data - be it
 cassandra or a file system such as HDFS. By using DISK_ONLY you are telling
 spark to cache the RDDs on disk only (and not memory).

 On Wed, Sep 10, 2014 at 11:31 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi ,
   I try to evaluate different option of spark + cassandra and I have
 couple of additional questions.
   My aim is to use cassandra only without hadoop:
   1) Is it possible to use only cassandra as input/output parameter for
 PySpark?
   2) In case I'll use Spark (java,scala) is it possible to use only
 cassandra - input/output without hadoop?
   3) I know there are couple of strategies for storage level, in case my
 data set is quite big and I have no enough memory to process - can I use
 DISK_ONLY option without hadoop (having only cassandra)?

 Thanks
 Oleg

 On Wed, Sep 3, 2014 at 3:08 AM, Kan Zhang kzh...@apache.org wrote:

 In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs.
 See examples/src/main/python/cassandra_inputformat.py for an example.
 You may need to write your own key/value converters.


 On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com
 wrote:

 Hi All ,
Is it possible to have cassandra as input data for PySpark. I found
 example for java -
 http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0
 and I am looking something similar for python.

 Thanks
 Oleg.







Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-10 Thread alexandria1101
I used the hiveContext to register the tables and the tables are still not
being found by the thrift server.  Do I have to pass the hiveContext to JDBC
somehow?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accumulo and Spark

2014-09-10 Thread Megavolt
I've been doing some Googling and haven't found much info on how to
incorporate Spark and Accumulo.  Does anyone know of some examples of how to
tie Spark to Accumulo (for both fetching data and dumping results)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accumulo-and-Spark-tp13923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-10 Thread Denny Lee
Actually, when registering the table, it is only available within the sc 
context you are running it in. For Spark 1.1, the method name is changed to 
RegisterAsTempTable to better reflect that. 

The Thrift server process runs under a different process meaning that it cannot 
see any of the tables generated within the sc context. You would need to save 
the sc table into Hive and then the Thrift process would be able to see them.

HTH!

 On Sep 10, 2014, at 13:08, alexandria1101 alexandria.shea...@gmail.com 
 wrote:
 
 I used the hiveContext to register the tables and the tables are still not
 being found by the thrift server.  Do I have to pass the hiveContext to JDBC
 somehow?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-10 Thread Du Li
Hi Denny,

There is a related question by the way.

I have a program that reads in a stream of RDD¹s, each of which is to be
loaded into a hive table as one partition. Currently I do this by first
writing the RDD¹s to HDFS and then loading them to hive, which requires
multiple passes of HDFS I/O and serialization/deserialization.

I wonder if it is possible to do it more efficiently with Spark 1.1
streaming + SQL, e.g., by registering the RDDs into a hive context so that
the data is loaded directly into the hive table in cache and meanwhile
visible to jdbc/odbc clients. In the spark source code, the method
registerTempTable you mentioned works on SqlContext instead of HiveContext.

Thanks,
Du



On 9/10/14, 1:21 PM, Denny Lee denny.g@gmail.com wrote:

Actually, when registering the table, it is only available within the sc
context you are running it in. For Spark 1.1, the method name is changed
to RegisterAsTempTable to better reflect that.

The Thrift server process runs under a different process meaning that it
cannot see any of the tables generated within the sc context. You would
need to save the sc table into Hive and then the Thrift process would be
able to see them.

HTH!

 On Sep 10, 2014, at 13:08, alexandria1101
alexandria.shea...@gmail.com wrote:
 
 I used the hiveContext to register the tables and the tables are still
not
 being found by the thrift server.  Do I have to pass the hiveContext to
JDBC
 somehow?
 
 
 
 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using
-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2

2014-09-10 Thread Jeffrey Picard
Hey guys,

After rebuilding from the master branch this morning, I’ve started to see these 
errors that I’ve never gotten before while running connected components. Anyone 
seen this before?

14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 87 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 58 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 57 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:38:53 INFO collection.ExternalSorter: Thread 60 spilling in-memory 
batch of 1020 MB to disk (1 spill so far)
14/09/10 20:39:15 ERROR executor.Executor: Exception in task 275.0 in stage 3.0 
(TID 994)
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
at 
org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329)
at 
org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271)
at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/10 20:39:15 ERROR executor.Executor: Exception in task 176.0 in stage 3.0 
(TID 894)
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
at 
org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329)
at 
org.apache.spark.util.collection.ExternalSorter.spill(ExternalSorter.scala:271)
at 
org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:249)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:220)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: how to setup steady state stream partitions

2014-09-10 Thread Anton Brazhnyk
Just a guess.
updateStateByKey has overloaded variants with partitioner as parameter. Can it 
help?

-Original Message-
From: qihong [mailto:qc...@pivotal.io] 
Sent: Tuesday, September 09, 2014 9:13 PM
To: u...@spark.incubator.apache.org
Subject: Re: how to setup steady state stream partitions

Thanks for your response. I do have something like:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key val 
partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new
MyPartitioner(...)))
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction)

The partitionedDStream does have steady partitions, but stateDStream does not 
have steady partitions, i.e., in the partition 0 of partitionedDStream, there's 
only data for sensors 0 to 999, but the partition 0 of stateDStream contains 
data for some sensors from 0 to 999 range, and lot of sensor from other 
partitions of partitionedDStream. 

I wish the partition 0 of stateDStream only contains the data from the 
partition 0 of partitionedDStream, partiton 1 of stateDStream only from 
partition 1 of partitionedDStream, and so on. Anyone knows how to implement 
that?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DELIVERY FAILURE: Error transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count exceeded. Message probably in a routing loop.

2014-09-10 Thread Andrew Or
Great, good to know it's not just me...

2014-09-10 11:25 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 Yes please pretty please. This is really annoying.

 On Sun, Sep 7, 2014 at 6:31 AM, Ognen Duzlevski ognen.duzlev...@gmail.com
  wrote:


 I keep getting below reply every time I send a message to the Spark user
 list? Can this person be taken off the list by powers that be?
 Thanks!
 Ognen

  Forwarded Message   Subject: DELIVERY FAILURE: Error
 transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count exceeded.
 Message probably in a routing loop.  Date: Sun, 7 Sep 2014 08:29:23 -0500  
 From:
 postmas...@sgcib.com  To: ognen.duzlev...@gmail.com

 Your message

   Subject: Re: Adding quota to the ephemeral hdfs on a standalone spark 
 cluster on ec2

 was not delivered to:

   pierre.lanvin-...@sgcib.com

 because:

   Error transferring to QCMBSJ601.HERMES.SI.SOCGEN; Maximum hop count 
 exceeded.  Message probably in a routing loop.






 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Marcelo



Re: Accumulo and Spark

2014-09-10 Thread Russ Weeks
It's very straightforward to set up a Hadoop RDD to use
AccumuloInputFormat. Something like this will do the trick:

private JavaPairRDDKey,Value newAccumuloRDD(JavaSparkContext sc,
AgileConf agileConf, String appName, Authorizations auths)
throws IOException, AccumuloSecurityException {
Job hadoopJob = Job.getInstance(agileConf, appName);
// configureAccumuloInput is exactly the same as for an MR job
// sets zookeeper instance, credentials, table name, auths etc.
configureAccumuloInput(hadoopJob, ACCUMULO_TABLE, auths);
return sc.newAPIHadoopRDD(hadoopJob.getConfiguration(),
AccumuloInputFormat.class, Key.class, Value.class);
}

There's tons of docs around how to operate on a JavaPairRDD. But you're
right, there's hardly anything at all re. how to plug accumulo into spark.

-Russ

On Wed, Sep 10, 2014 at 1:17 PM, Megavolt jbru...@42six.com wrote:

 I've been doing some Googling and haven't found much info on how to
 incorporate Spark and Accumulo.  Does anyone know of some examples of how
 to
 tie Spark to Accumulo (for both fetching data and dumping results)?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Accumulo-and-Spark-tp13923.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark + AccumuloInputFormat

2014-09-10 Thread Russ Weeks
To answer my own question... I didn't realize that I was responsible for
telling Spark how much parallelism I wanted for my job. I figured that
between Spark and Yarn they'd figure it out for themselves.

Adding --executor-memory 3G --num-executors 24 to my spark-submit command
took the query time down to 30s from 18 minutes and I'm seeing much better
utilization of my accumulo tablet servers.

-Russ

On Tue, Sep 9, 2014 at 5:13 PM, Russ Weeks rwe...@newbrightidea.com wrote:

 Hi,

 I'm trying to execute Spark SQL queries on top of the AccumuloInputFormat.
 Not sure if I should be asking on the Spark list or the Accumulo list, but
 I'll try here. The problem is that the workload to process SQL queries
 doesn't seem to be distributed across my cluster very well.

 My Spark SQL app is running in yarn-client mode. The query I'm running is
 select count(*) from audit_log (or a similarly simple query) where my
 audit_log table has 14.3M rows, 504M key value pairs spread fairly evenly
 across 8 tablet servers. Looking at the Accumulo monitor app, I only ever
 see a maximum of 2 tablet servers with active scans. Since the data is
 spread across all the tablet servers, I hoped to see 8!

 I realize there are a lot of moving parts here but I'd any advice about
 where to start looking.

 Using Spark 1.0.1 with Accumulo 1.6.

 Thanks!
 -Russ



RE: how to setup steady state stream partitions

2014-09-10 Thread qihong
Thanks for your response! I found that too, and it does the trick! Here's
refined code:

val inputDStream = ... 
val keyedDStream = inputDStream.map(...)  // use sensorId as key 
val partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new
MyPartitioner(...))) 
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction,
new MyPartitioner(...)) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop

2014-09-10 Thread Marcelo Vanzin
On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote:
 This structure is not specific to Hadoop, but in theory works in any
 JAR file. You can put JARs in JARs and refer to them with Class-Path
 entries in META-INF/MANIFEST.MF.

Funny that you mention that, since someone internally asked the same
question, and I spend some time looking at it.

That's not actually how Class-Path works in the manifest. You can't
have jars inside other jars; the Class-Path items reference things in
the filesystem itself. So that solution doesn't work.

It would be nice to add the feature Steve is talking about, though.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD memory questions

2014-09-10 Thread Davies Liu
On Wed, Sep 10, 2014 at 1:05 AM, Boxian Dong box...@indoo.rs wrote:
 Thank you very much for your kindly help. I rise some another questions:

- If the RDD is stored in serialized format, is that means that whenever
 the RDD is processed, it will be unpacked and packed again from and back to
 the JVM even they are located on the same machine?
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n13862/rdd_img.png

In PySpark, Yes. But in Spark generally, no, you have several choice
to cache RDD
in Scala, serialized or not.

- Can the RDD be partially unpacked from the serialized state? or when
 every a RDD is touched, it must be fully unpacked, and of course pack again
 afterword.

The items in RDD are deserialized batch by batch, so if you call rdd.take(),
only first small parts of items are deserialized.

The cache of RDD are kept in JVM, you do not need to pack again after
visiting them.

   -  When a RDD is cached, is it saved in a unserialized format or
 serialized format? If it's saved in a unserialized format, is the partially
 reading of RDD from JVM to PYTHON runtime possible?

For PySpark, they are all saved in serialized format. During a transformation
of RDD, you can only see the current partition, you can not access other
partitions or other RDD.

The RDD always are read-only, so you can not modify them any time.
(all the modification will be dropped.)

 Thank you very much



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805p13862.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop

2014-09-10 Thread Sean Owen
Hm, so it is:
http://docs.oracle.com/javase/tutorial/deployment/jar/downman.html

I'm sure I've done this before though and thought is was this mechanism. It
must be something custom.

What's the Hadoop jar structure in question then? Is it something special
like a WAR file? I confess I had never heard of this so thought this was
about generic JAR stuff.

Is the question about a lib dir in the Hadoop home dir?
On Sep 10, 2014 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote:
  This structure is not specific to Hadoop, but in theory works in any
  JAR file. You can put JARs in JARs and refer to them with Class-Path
  entries in META-INF/MANIFEST.MF.

 Funny that you mention that, since someone internally asked the same
 question, and I spend some time looking at it.

 That's not actually how Class-Path works in the manifest. You can't
 have jars inside other jars; the Class-Path items reference things in
 the filesystem itself. So that solution doesn't work.

 It would be nice to add the feature Steve is talking about, though.

 --
 Marcelo



Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop

2014-09-10 Thread Steve Lewis
In modern projects there are a bazillion dependencies - when I use Hadoop I
just put them in a lib directory in the jar - If I have a project that
depends on 50 jars I need a way to deliver them to Spark - maybe wordcount
can be written without dependencies but real projects need to deliver
dependencies to the cluster

On Wed, Sep 10, 2014 at 11:44 PM, Sean Owen so...@cloudera.com wrote:

 Hm, so it is:
 http://docs.oracle.com/javase/tutorial/deployment/jar/downman.html

 I'm sure I've done this before though and thought is was this mechanism.
 It must be something custom.

 What's the Hadoop jar structure in question then? Is it something special
 like a WAR file? I confess I had never heard of this so thought this was
 about generic JAR stuff.

 Is the question about a lib dir in the Hadoop home dir?
 On Sep 10, 2014 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote:
  This structure is not specific to Hadoop, but in theory works in any
  JAR file. You can put JARs in JARs and refer to them with Class-Path
  entries in META-INF/MANIFEST.MF.

 Funny that you mention that, since someone internally asked the same
 question, and I spend some time looking at it.

 That's not actually how Class-Path works in the manifest. You can't
 have jars inside other jars; the Class-Path items reference things in
 the filesystem itself. So that solution doesn't work.

 It would be nice to add the feature Steve is talking about, though.

 --
 Marcelo




-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com


Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop

2014-09-10 Thread Marcelo Vanzin
On Wed, Sep 10, 2014 at 3:44 PM, Sean Owen so...@cloudera.com wrote:
 What's the Hadoop jar structure in question then? Is it something special
 like a WAR file? I confess I had never heard of this so thought this was
 about generic JAR stuff.

What I've been told (and Steve's e-mail alludes to) is that you can
put jars inside a lib/ directory in your jar (so jars-within-a-jar),
and the MR app classloader will automatically add those to your app's
class path (by using a special class loader or exploding the jar or
something, don't know the exact mechanism).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PrintWriter error in foreach

2014-09-10 Thread Daniil Osipov
Try providing full path to the file you want to write, and make sure the
directory exists and is writable by the Spark process.

On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I have a spark program that worked in local mode, but throws an error in
 yarn-client mode on a cluster. On the edge node in my home directory, I
 have an output directory (called transout) which is ready to receive files.
 The spark job I'm running is supposed to write a few hundred files into
 that directory, once for each iteration of a foreach function. This works
 in local mode, and my only guess as to why this would fail in yarn-client
 mode is that the RDD is distributed across many nodes and the program is
 trying to use the PrintWriter on the datanodes, where the output directory
 doesn't exist. Is this what's happening? Any proposed solution?

 abbreviation of the code:

 import java.io.PrintWriter
 ...
 rdd.foreach {
   val outFile = new PrintWriter(transoutput/output.%s.format(id))
   outFile.println(test)
   outFile.close()
 }

 Error:

 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26)
 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException
 java.io.FileNotFoundException: transoutput/input.598718 (No such file or
 directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:194)
 at java.io.FileOutputStream.init(FileOutputStream.java:84)
 at java.io.PrintWriter.init(PrintWriter.java:146)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)



GraphX : AssertionError

2014-09-10 Thread Vipul Pandey
Hi,

I have a small graph with about 3.3M vertices and close to  7.5M edges. It's a 
pretty innocent graph with the max degree of 8.
Unfortunately, graph.traingleCount is failing on me with the exception below. 
I'm running a spark-shell on CDH5.1 with the following params : 
SPARK_DRIVER_MEM=10g ADD_JARS=./path/to/my-jar-with-dependencies.jar 
SPARK_WORKER_INSTANCES=120  SPARK_WORKER_MEMORY=5g  
SPARK_YARN_APP_NAME=VipulsSparkShell MASTER=yarn-client spark-shell

Any clue anyone?
Vipul

 
14/09/10 16:12:22 INFO cluster.YarnClientClusterScheduler: Stage 80 was 
cancelled
14/09/10 16:12:22 INFO scheduler.TaskSetManager: Loss was due to 
java.lang.AssertionError: assertion failed [duplicate 8]
14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 326 was killed.
14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 325 was killed.
14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 320 was killed.
14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 324 was killed.
14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 322 was killed.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 80.0:6 
failed 4 times, most recent failure: Exception failure in TID 321 on host 
abc.xyz.com: java.lang.AssertionError: assertion failed
scala.Predef$.assert(Predef.scala:165)

org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:89)

org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:86)

org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:125)
org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:192)
org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:189)

org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PrintWriter error in foreach

2014-09-10 Thread Arun Luthra
Ok, so I don't think the workers on the data nodes will be able to see my
output directory on the edge node. I don't think stdout will work either,
so I'll write to HDFS via rdd.saveAsTextFile(...)

On Wed, Sep 10, 2014 at 3:51 PM, Daniil Osipov daniil.osi...@shazam.com
wrote:

 Try providing full path to the file you want to write, and make sure the
 directory exists and is writable by the Spark process.

 On Wed, Sep 10, 2014 at 3:46 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I have a spark program that worked in local mode, but throws an error in
 yarn-client mode on a cluster. On the edge node in my home directory, I
 have an output directory (called transout) which is ready to receive files.
 The spark job I'm running is supposed to write a few hundred files into
 that directory, once for each iteration of a foreach function. This works
 in local mode, and my only guess as to why this would fail in yarn-client
 mode is that the RDD is distributed across many nodes and the program is
 trying to use the PrintWriter on the datanodes, where the output directory
 doesn't exist. Is this what's happening? Any proposed solution?

 abbreviation of the code:

 import java.io.PrintWriter
 ...
 rdd.foreach {
   val outFile = new PrintWriter(transoutput/output.%s.format(id))
   outFile.println(test)
   outFile.close()
 }

 Error:

 14/09/10 16:57:09 WARN TaskSetManager: Lost TID 1826 (task 0.0:26)
 14/09/10 16:57:09 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException
 java.io.FileNotFoundException: transoutput/input.598718 (No such file or
 directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:194)
 at java.io.FileOutputStream.init(FileOutputStream.java:84)
 at java.io.PrintWriter.init(PrintWriter.java:146)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:98)
 at
 com.att.bdcoe.cip.ooh.TransformationLayer$$anonfun$main$3.apply(TransformLayer.scala:95)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:703)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)





Re: Re: Spark SQL -- more than two tables for join

2014-09-10 Thread boyingk...@163.com
Hi,michael :

I think Arthur.hk.chan isn't here now,I Can Show something:
1)my spark version is 1.0.1
2) when I use multiple join ,like this:
sql(SELECT * FROM youhao_data left join youhao_age on 
(youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on 
(youhao_age.rowkey=youhao_totalKiloMeter.rowkey)) 
  
   youhao_data,youhao_age,youhao_totalKiloMeter  were registerAsTable 。

I take the Exception:
Exception in thread main java.lang.RuntimeException: [1.90] failure: 
``UNION'' expected but `left' found

SELECT * FROM youhao_data left join youhao_age on 
(youhao_data.rowkey=youhao_age.rowkey) left join youhao_totalKiloMeter on 
(youhao_age.rowkey=youhao_totalKiloMeter.rowkey)

 ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:69)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:181)
at 
org.apache.spark.examples.sql.SparkSQLHBaseRelation$.main(SparkSQLHBaseRelation.scala:140)
at 
org.apache.spark.examples.sql.SparkSQLHBaseRelation.main(SparkSQLHBaseRelation.scala)



boyingk...@163.com

From: Michael Armbrust
Date: 2014-09-11 00:28
To: arthur.hk.c...@gmail.com
CC: arunshell87; u...@spark.incubator.apache.org
Subject: Re: Spark SQL -- more than two tables for join
What version of Spark SQL are you running here?  I think a lot of your concerns 
have likely been addressed in more recent versions of the code / documentation. 
 (Spark 1.1 should be published in the next few days)


In particular, for serious applications you should use a HiveContext and HiveQL 
as this is a much more complete implementation of a SQL Parser.  The one in SQL 
context is only suggested if the Hive dependencies conflict with your 
application.

1)  spark sql does not support multiple join



This is not true.  What problem were you running into?

2)  spark left join: has performance issue



Can you describe your data and query more?

3)  spark sql’s cache table: does not support two-tier query



I'm not sure what you mean here.

4)  spark sql does not support repartition


You can repartition SchemaRDDs in the same way as normal RDDs.

Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Dibyendu Bhattacharya
Hi,

You can use this Kafka Spark Consumer.
https://github.com/dibbhatt/kafka-spark-consumer

This is exactly does that . It creates parallel Receivers for every Kafka
topic partitions. You can see the Consumer.java under consumer.kafka.client
package to see an example how to use it.

There is some discussion on this Consumer you can find it here :
https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689

Regards,
Dib


On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith secs...@gmail.com wrote:

 How are you creating your kafka streams in Spark?

 If you have 10 partitions for a topic, you can call createStream ten
 times to create 10 parallel receivers/executors and then use union to
 combine all the dStreams.



 On Wed, Sep 10, 2014 at 7:16 AM, richiesgr richie...@gmail.com wrote:

 Hi (my previous post as been used by someone else)

 I'm building a application the read from kafka stream event. In production
 we've 5 consumers that share 10 partitions.
 But on spark streaming kafka only 1 worker act as a consumer then
 distribute
 the tasks to workers so I can have only 1 machine acting as consumer but I
 need more because only 1 consumer means Lags.

 Do you've any idea what I can do ? Another point is interresting the
 master
 is not loaded at all I can get up more than 10 % CPU

 I've tried to increase the queued.max.message.chunks on the kafka client
 to
 read more records thinking it'll speed up the read but I only get

 ERROR consumer.ConsumerFetcherThread:

 [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
 ClientId:

 SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
 ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] -
 PartitionFetchInfo(929838589,1048576),[IA2,6] -
 PartitionFetchInfo(929515796,1048576),[IA2,9] -
 PartitionFetchInfo(929577946,1048576),[IA2,8] -
 PartitionFetchInfo(930751599,1048576),[IA2,2] -
 PartitionFetchInfo(926457704,1048576),[IA2,5] -
 PartitionFetchInfo(930774385,1048576),[IA2,0] -
 PartitionFetchInfo(929913213,1048576),[IA2,3] -
 PartitionFetchInfo(929268891,1048576),[IA2,4] -
 PartitionFetchInfo(929949877,1048576),[IA2,1] -
 PartitionFetchInfo(930063114,1048576)
 java.lang.OutOfMemoryError: Java heap space

 Is someone have ideas ?
 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run jmap -histo:live pid or jmap histo pid on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
unreachable objects (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote:
 Actually, I am not doing any explicit shuffle/updateByKey or other
 transform functions. In my program flow, I take in data from Kafka,
 match each message against a list of regex and then if a msg matches a
 regex then extract groups, stuff them in json and push out back to
 kafka (different topic). So there is really no dependency between two
 messages in terms of processing. Here's my container histogram:
 http://pastebin.com/s3nAT3cY

 Essentially, my app is a cluster grep on steroids.



 On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com 
 wrote:
 Tim, I asked a similar question twice:
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
 and here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

 and have not yet received any responses. I noticed that the heapdump only
 contains a very large byte array consuming about 66%(the second link
 contains a picture of my heap -- I ran with a small heap to be able to get
 the failure quickly)

 I don't have solutions but wanted to affirm that I've observed a similar
 situation...

 On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken by
 [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

 Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
 streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism, 
 streaming
 data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not 
 inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the 
 inject
 rate.



 Thanks

 Jerry



 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Wednesday, September 10, 2014 5:21 AM
 To: user@spark.apache.org
 Subject: spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that 
 are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being 

Setting up jvm in pyspark from shell

2014-09-10 Thread Mohit Singh
Hi,
  I am using pyspark shell and am trying to create an rdd from numpy matrix
rdd = sc.parallelize(matrix)
I am getting the following error:
JVMDUMP039I Processing dump event systhrow, detail
java/lang/OutOfMemoryError at 2014/09/10 22:41:44 - please wait.
JVMDUMP032I JVM requested Heap dump using
'/global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd' in response
to an event
JVMDUMP010I Heap dump written to
/global/u2/m/msingh/heapdump.20140910.224144.29660.0005.phd
JVMDUMP032I JVM requested Java dump using
'/global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt' in response
to an event
JVMDUMP010I Java dump written to
/global/u2/m/msingh/javacore.20140910.224144.29660.0006.txt
JVMDUMP032I JVM requested Snap dump using
'/global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc' in response to an
event
JVMDUMP010I Snap dump written to
/global/u2/m/msingh/Snap.20140910.224144.29660.0007.trc
JVMDUMP013I Processed dump event systhrow, detail
java/lang/OutOfMemoryError.
Exception AttributeError: 'SparkContext' object has no attribute '_jsc'
in bound method SparkContext.__del__ of pyspark.context.SparkContext
object at 0x11f9450 ignored
Traceback (most recent call last):
  File stdin, line 1, in module
  File /usr/common/usg/spark/1.0.2/python/pyspark/context.py, line 271,
in parallelize
jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
  File
/usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 537, in __call__
  File
/usr/common/usg/spark/1.0.2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:618)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:804)

I did try to setSystemProperty
sc.setSystemProperty(spark.executor.memory, 20g)
How do i increase jvm heap from the shell?

-- 
Mohit

When you want success as badly as you want the air, then you will get it.
There is no other secret of success.
-Socrates