Re: Using Neo4j with Apache Spark
On Thu, 12 Mar 2015 00:48:12 -0700 d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: Hi It seems some things in your task aren't serializable. A quick look at the code suggests graphDB as a potential problem. If you want to create that in one place (driver) and fetch it later in the step you can do sth like this: - create a container class, that you will broadcast class LazyGraphDB extends Serializable { @transient override lazy val graphDB = new GraphDatabase() } - than in driver code: val graphDbBc = sc.broadcast(new LazyGraphDB) - and in the task you'd like to use it, just write: graphDbBc.value.graphDB... Just remember about all the transient, lazy modifiers. Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Neo4j with Apache Spark
What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
Re: Using Neo4j with Apache Spark
I tried that too. It result in same serializability issue. GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com wrote: What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at
Re: Can't cache RDD of collaborative filtering on MLlib
I got answer from mail posted to ML. --- Summary --- cache() is lazy, so you can use `RDD.count()` explicitly to load into memory. --- And I tried, two RDDs were cached and the speed became faster. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-cache-RDD-of-collaborative-filtering-on-MLlib-tp21962p22000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
connecting spark application with SAP hana
Hi experts! Is there any way to connect SAP hana in spark application and get data from hana tables in our spark application? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/connecting-spark-application-with-SAP-hana-tp22011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Compilation error
I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example *private* *void* map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( *new* *FlatMapFunctionString, String()* { @Override *public* IterableString call(String x) { *return* Arrays.*asList*(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.*map*( *new* *PairFunctionString, String, Integer()* { @Override *public* *Tuple2*String, Integer call(String s) *throws* Exception { *return* *new* *Tuple2*String, Integer(s, 1); } }); }
Using Neo4j with Apache Spark
I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String, ArrayListlt;String arg0) throws Exception { // TODO Auto-generated method stub try( Transaction tx = graphDB.beginTx()){ if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) System.out.println(Alread in Database: + arg0._1); else{ Neo4jOperations.createHMac(graphDB, arg0._1); } tx.success(); } } }); return null; } }); Neo4jOperations Class: public class Neo4jOperations{ public static Node
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
There are a couple of differences between the ml-matrix implementation and the one used in AMPCamp - I think the AMPCamp one uses JBLAS which tends to ship native BLAS libraries along with it. In ml-matrix we switched to using Breeze + Netlib BLAS which is faster but needs some setup [1] to pick up native libraries. If native libraries are not found it falls back to a JVM implementation, so that might explain the slow down. - The other difference if you are comparing the whole image pipeline is that I think the AMPCamp version used NormalEquations which is around 2-3x faster (just in terms of number of flops) compared to TSQR. [1] https://github.com/fommil/netlib-java#machine-optimised-system-libraries Thanks Shivaram On Tue, Mar 10, 2015 at 9:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: connecting spark application with SAP hana
SAP hana can be integrated with hadoop http://saphanatutorial.com/sap-hana-and-hadoop/, so you will be able to read/write to it using newAPIHadoopFile api of spark by passing the correct Configurations etc. Thanks Best Regards On Thu, Mar 12, 2015 at 1:15 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! Is there any way to connect SAP hana in spark application and get data from hana tables in our spark application? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/connecting-spark-application-with-SAP-hana-tp22011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compilation error
A couple points: You've got mismatched versions here -- 1.2.0 vs 1.2.1. You should fix that but it's not your problem. These are also supposed to be 'provided' scope dependencies in Maven. You should get the Scala deps transitively and can import scala.* classes. However, it would be a little bit more correct to depend directly on the scala library classes, but in practice, easiest not to in simple use cases. If you're still having trouble look at the output of mvn dependency:tree On Tue, Mar 10, 2015 at 6:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using maven and my dependency looks like this, but this doesn't seem to be working dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.1/version /dependency /dependencies On Tue, Mar 10, 2015 at 11:06 AM, Tathagata Das t...@databricks.com wrote: If you are using tools like SBT/Maven/Gradle/etc, they figure out all the recursive dependencies and includes them in the class path. I haven't touched Eclipse in years so I am not sure off the top of my head what's going on instead. Just in case you only downloaded the spark-streaming_2.10.jar then that is indeed insufficient and you have to download all the recursive dependencies. May be you should create a Maven project inside Eclipse? TD On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How do I do that? I haven't used Scala before. Also, linking page doesn't mention that: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote: It means you do not have Scala library classes in your project classpath. On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example private void map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } }); } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Neo4j with Apache Spark
I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String, ArrayListlt;String arg0) throws Exception { // TODO Auto-generated method stub try( Transaction tx = graphDB.beginTx()){ if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) System.out.println(Alread in Database: + arg0._1); else{ Neo4jOperations.createHMac(graphDB, arg0._1); }
Re: Using Neo4j with Apache Spark
Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String, ArrayListlt;String arg0) throws Exception { // TODO Auto-generated method stub try( Transaction tx = graphDB.beginTx()){ if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
Re: Using Neo4j with Apache Spark
Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new Function2JavaPairRDDlt;String,ArrayListlt;String,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListlt;String arg0, Time arg1) throws Exception { // TODO Auto-generated method stub arg0.foreach( new VoidFunctionTuple2lt;String,ArrayListlt;String(){ @Override public void call( Tuple2String,
Re: Joining data using Latitude, Longitude
Ted Dunning and Ellen Friedman's Time Series Databases has a section on this with some approaches to geo-encoding: https://www.mapr.com/time-series-databases-new-ways-store-and-access-data http://info.mapr.com/rs/mapr/images/Time_Series_Databases.pdf On Tue, Mar 10, 2015 at 3:53 PM, John Meehan jnmee...@gmail.com wrote: There are some techniques you can use If you geohash http://en.wikipedia.org/wiki/Geohash the lat-lngs. They will naturally be sorted by proximity (with some edge cases so watch out). If you go the join route, either by trimming the lat-lngs or geohashing them, you’re essentially grouping nearby locations into buckets — but you have to consider the borders of the buckets since the nearest location may actually be in an adjacent bucket. Here’s a paper that discusses an implementation: http://www.gdeepak.com/thesisme/Finding%20Nearest%20Location%20with%20open%20box%20query.pdf On Mar 9, 2015, at 11:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using SparkSQL for the join? In that case I'm not quiet sure you have a lot of options to join on the nearest co-ordinate. If you are using the normal Spark code (by creating key-pair on lat,lon) you can apply certain logic like trimming the lat,lon etc. If you want more specific computing then you are better off using haversine formula. http://www.movable-type.co.uk/scripts/latlong.html
Re: Using Neo4j with Apache Spark
Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 17 more Here is my code: output a stream of type: JavaPairDStreamString, ArrayListlt;String output.foreachRDD( new
Re: Unable to saveToCassandra while cassandraTable works fine
This: java.lang.NoSuchMethodError almost always indicates a version conflict somewhere. It looks like you are using Spark 1.1.1 with the cassandra-spark connector 1.2.0. Try aligning those. Those metrics were introduced recently in the 1.2.0 branch of the cassandra connector. Either upgrade your spark to 1.2.0 or downgrade the connector to something compatible with Spark 1.1.1 -kr, Gerard On Wed, Mar 11, 2015 at 1:42 PM, Tiwari, Tarun tarun.tiw...@kronos.com wrote: Hi, I am stuck at this for 3 days now. I am using the spark-cassandra-connector with spark and I am able to make RDDs with sc.cassandraTable function that means spark is able to communicate with Cassandra properly. But somehow the saveToCassandra is not working. Below are the steps I am doing. Does it have something to do with my spark-env or spark-defaults? Am I missing something critical ? scala import com.datastax.spark.connector._ scala sc.addJar(/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar) scala val myTable = sc.cassandraTable(test2, words) scala myTable.collect() *--- this works perfectly fine.* scala val data = sc.parallelize(Seq((81, XXX), (82, ))) scala data.saveToCassandra(test2, words, SomeColumns(word, count)) *--- this fails* 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042 added 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 10.131.141.192 (datacenter1) 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042 added 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 10.131.141.193 (datacenter1) 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042 added 15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster: wfan_cluster_DB 15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at RDDFunctions.scala:29 15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at RDDFunctions.scala:29) with 2 output partitions (allowLocal=false) 15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at RDDFunctions.scala:29) 15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List() 15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List() 15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1 (ParallelCollectionRDD[1] at parallelize at console:20), which has no missing parents 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with curMem=1792, maxMem=2778778828 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.2 KB, free 2.6 GB) 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with curMem=9192, maxMem=2778778828 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.5 KB, free 2.6 GB) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB) 15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (ParallelCollectionRDD[1] at parallelize at console:20) 15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.131.141.192, PROCESS_LOCAL, 1216 bytes) 15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.131.141.193, PROCESS_LOCAL, 1217 bytes) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB) 15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra cluster: wfan_cluster_DB 15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 10.131.141.192): java.lang.NoSuchMethodError: org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option; com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Re: Compilation error
It works after sync, thanks for the pointers On Tue, Mar 10, 2015 at 1:22 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I navigated to maven dependency and found scala library. I also found Tuple2.class and when I click on it in eclipse I get invalid LOC header (bad signature) java.util.zip.ZipException: invalid LOC header (bad signature) at java.util.zip.ZipFile.read(Native Method) I am wondering if I should delete that file from local repo and re-sync On Tue, Mar 10, 2015 at 1:08 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I ran the dependency command and see the following dependencies: I only see org.scala-lang. [INFO] org.spark.test:spak-test:jar:0.0.1-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.2.0:compile [INFO] | +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:co mpile [INFO] | | +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compil e [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile [INFO] | +- org.scala-lang:scala-library:jar:2.10.4:compile [INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.2.1:compile [INFO] +- com.twitter:chill_2.10:jar:0.5.0:compile [INFO] | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile [INFO] | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:co mpile [INFO] | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile [INFO] | \- org.objenesis:objenesis:jar:1.2:compile [INFO] +- com.twitter:chill-java:jar:0.5.0:compile [INFO] +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile [INFO] | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile [INFO] | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | +- org.apache.commons:commons-math:jar:2.1:compile [INFO] | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | +- commons-io:commons-io:jar:2.1:compile [INFO] | | +- commons-logging:commons-logging:jar:1.1.1:compile [INFO] | | +- commons-lang:commons-lang:jar:2.5:compile [INFO] | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile [INFO] | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | +- org.apache.avro:avro:jar:1.7.4:compile [INFO] | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile [INFO] | | \- org.apache.commons:commons-compress:jar:1.4.1:compile [INFO] | | \- org.tukaani:xz:jar:1.0:compile [INFO] | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile [INFO] | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile [INFO] | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile [INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:co mpile [INFO] | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile [INFO] | | | | +- com.google.inject:guice:jar:3.0:compile [INFO] | | | | | +- javax.inject:javax.inject:jar:1:compile [INFO] | | | | | \- aopalliance:aopalliance:jar:1.0:compile [INFO] | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framew ork-grizzly2:jar:1.9:compile [INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-fra mework-core:jar:1.9:compile [INFO] | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile [INFO] | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile [INFO] | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http:jar:2.1.2:comp ile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2. 1.2:compile [INFO] | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0. 0-b023:compile [INFO] | | | | | | \- org.glassfish.external:management-api:ja r:3.0.0-b012:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1 .2:compile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:co mpile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2. 1.2:compile [INFO] | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile [INFO] | | | | +- com.sun.jersey:jersey-server:jar:1.9:compile [INFO] | | | | | +- asm:asm:jar:3.1:compile [INFO] | | | | | \- com.sun.jersey:jersey-core:jar:1.9:compile [INFO] | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile [INFO] | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile [INFO] | | | | | | \- stax:stax-api:jar:1.0.1:compile [INFO] | |
RE: sc.textFile() on windows cannot access UNC path
Thanks for the reference. Is the following procedure correct? 1.Copy of the Hadoop source code org.apache.hadoop.mapreduce.lib.input .TextInputFormat.java as my own class, e.g. UncTextInputFormat.java 2.Modify UncTextInputFormat.java to handle UNC path 3.Call sc.newAPIHadoopFile(…) with sc.newAPIHadoopFile[LongWritable, Text, UncTextInputFormat](“file:10.196.119.230/folder1/abc.txt”, classOf[UncTextInputFormat], classOf[LongWritable], classOf[Text], conf) Ningjun From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, March 11, 2015 2:40 AM To: Wang, Ningjun (LNG-NPV) Cc: java8964; user@spark.apache.org Subject: Re: sc.textFile() on windows cannot access UNC path I don't have a complete example for your usecase, but you can see a lot of codes showing how to use new APIHadoopFile from herehttps://github.com/search?q=sc.newAPIHadoopFiletype=Codeutf8=%E2%9C%93 Thanks Best Regards On Tue, Mar 10, 2015 at 7:37 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: This sounds like the right approach. Is there any sample code showing how to use sc.newAPIHadoopFile ? I am new to Spark and don’t know much about Hadoop. I just want to read a text file from UNC path into an RDD. Thanks From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Tuesday, March 10, 2015 9:14 AM To: java8964 Cc: Wang, Ningjun (LNG-NPV); user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: sc.textFile() on windows cannot access UNC path You can create your own Input Reader (using java.nio.*) and pass it to the sc.newAPIHadoopFile while reading. Thanks Best Regards On Tue, Mar 10, 2015 at 6:28 PM, java8964 java8...@hotmail.commailto:java8...@hotmail.com wrote: I think the work around is clear. Using JDK 7, and implement your own saveAsRemoteWinText() using java.nio.path. Yong From: ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com To: java8...@hotmail.commailto:java8...@hotmail.com; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: sc.textFile() on windows cannot access UNC path Date: Tue, 10 Mar 2015 03:02:37 + Hi Yong Thanks for the reply. Yes it works with local drive letter. But I really need to use UNC path because the path is input from at runtime. I cannot dynamically assign a drive letter to arbitrary UNC path at runtime. Is there any work around that I can use UNC path for sc.textFile(…)? Ningjun From: java8964 [mailto:java8...@hotmail.commailto:java8...@hotmail.com] Sent: Monday, March 09, 2015 5:33 PM To: Wang, Ningjun (LNG-NPV); user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: sc.textFile() on windows cannot access UNC path This is a Java problem, not really Spark. From this page: http://stackoverflow.com/questions/18520972/converting-java-file-url-to-file-path-platform-independent-including-u You can see that using Java.nio.* on JDK 7, it will fix this issue. But Path class in Hadoop will use java.io.*, instead of java.nio. You need to manually mount your windows remote share a local driver, like Z:, then it should work. Yong From: ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com To: user@spark.apache.orgmailto:user@spark.apache.org Subject: sc.textFile() on windows cannot access UNC path Date: Mon, 9 Mar 2015 21:09:38 + I am running Spark on windows 2008 R2. I use sc.textFile() to load text file using UNC path, it does not work. sc.textFile(rawfile:10.196.119.230/folder1/abc.txtfile:///\\10.196.119.230\folder1\abc.txt, 4).count() Input path does not exist: file:/10.196.119.230/folder1/abc.txthttp://10.196.119.230/folder1/abc.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/10.196.119.230/tar/Enron/enron-207-short.loadhttp://10.196.119.230/tar/Enron/enron-207-short.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
RE: hbase sql query
Thanks Akhil. Additionaly if we want to do sql query we need to create JavaPairRdd, then JavaRdd, then JavaSchemaRdd and then sqlContext.sql(sql query). Ryt ? Thanks, Udbhav Agarwal From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: 12 March, 2015 11:43 AM To: Udbhav Agarwal Cc: user@spark.apache.org Subject: Re: hbase sql query Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete examplehttps://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q. Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. Thanks, Udbhav Agarwal
Re: Spark Streaming recover from Checkpoint with Spark SQL
Thanks, the new guide did help - instantiating the SQLContext inside foreachRDD did the trick for me, but the SQLContext singleton works as well. Now the only problem left is that spark.driver.port is not retained after starting from a checkpoint, so my Actor receivers are running on a random port... On 12.03.2015, at 02:35, Tathagata Das t...@databricks.com wrote: Can you show us the code that you are using? This might help. This is the updated streaming programming guide for 1.3, soon to be up, this is a quick preview. http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations TD On Wed, Mar 11, 2015 at 12:20 PM, Marius Soutier mps@gmail.com mailto:mps@gmail.com wrote: Forgot to mention, it works when using .foreachRDD(_.saveAsTextFile(“”)). On 11.03.2015, at 18:35, Marius Soutier mps@gmail.com mailto:mps@gmail.com wrote: Hi, I’ve written a Spark Streaming Job that inserts into a Parquet, using stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added checkpointing; everything works fine when starting from scratch. When starting from a checkpoint however, the job doesn’t work and produces the following exception in the foreachRDD: ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 142609383 ms.2 org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc http://org.apache.spark.rdd.rdd.sc/(RDD.scala:90) at org.apache.spark.rdd.RDD.init(RDD.scala:143) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114) at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167) at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) Cheers - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
RE: SQL with Spark Streaming
@Tobias, According to my understanding, your approach is to register a series of tables by using transformWith, right? And then, you can get a new Dstream (i.e., SchemaDstream), which consists of lots of SchemaRDDs. Please correct me if my understanding is wrong. Thank you Best Regards, Grace (Huang Jie) From: Jason Dai [mailto:jason@gmail.com] Sent: Wednesday, March 11, 2015 10:45 PM To: Irfan Ahmad Cc: Tobias Pfeiffer; Cheng, Hao; Mohit Anchlia; user@spark.apache.org; Shao, Saisai; Dai, Jason; Huang, Jie Subject: Re: SQL with Spark Streaming Sorry typo; should be https://github.com/intel-spark/stream-sql Thanks, -Jason On Wed, Mar 11, 2015 at 10:19 PM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Got a 404 on that link: https://github.com/Intel-bigdata/spark-streamsql Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 11, 2015 at 6:41 AM, Jason Dai jason@gmail.commailto:jason@gmail.com wrote: Yes, a previous prototype is available https://github.com/Intel-bigdata/spark-streamsql, and a talk is given at last year's Spark Summit (http://spark-summit.org/2014/talk/streamsql-on-spark-manipulating-streams-by-sql-using-spark) We are currently porting the prototype to use the latest DataFrame API, and will provide a stable version for people to try soon. Thabnks, -Jason On Wed, Mar 11, 2015 at 9:12 AM, Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp wrote: Hi, On Wed, Mar 11, 2015 at 9:33 AM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Intel has a prototype for doing this, SaiSai and Jason are the authors. Probably you can ask them for some materials. The github repository is here: https://github.com/intel-spark/stream-sql Also, what I did is writing a wrapper class SchemaDStream that internally holds a DStream[Row] and a DStream[StructType] (the latter having just one element in every RDD) and then allows to do - operations SchemaRDD = SchemaRDD using `rowStream.transformWith(schemaStream, ...)` - in particular you can register this stream's data as a table this way - and via a companion object with a method `fromSQL(sql: String): SchemaDStream` you can get a new stream from previously registered tables. However, you are limited to batch-internal operations, i.e., you can't aggregate across batches. I am not able to share the code at the moment, but will within the next months. It is not very advanced code, though, and should be easy to replicate. Also, I have no idea about the performance of transformWith Tobias
Re: S3 SubFolder Write Issues
I use s3n://BucketName/SomeFoler/OutputFolder and it works for my app. On Wed, Mar 11, 2015 at 12:14 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Does it write anything in BUCKET/SUB_FOLDER/output? Thanks Best Regards On Wed, Mar 11, 2015 at 10:15 AM, cpalm3 cpa...@gmail.com wrote: Hi All, I am hoping someone has seen this issue before with S3, as I haven't been able to find a solution for this problem. When I try to save as Text file to s3 into a subfolder, it only ever writes out to the bucket level folder and produces block level generated file names and not my output folder as I specified. Below is the sample code in Scala, I have also seen this behavior in the Java code. val out = inputRdd.map {ir = mapFunction(ir)}.groupByKey().mapValues { x = mapValuesFunction(x) } .saveAsTextFile(s3://BUCKET/SUB_FOLDER/output Any ideas on how to get saveAsTextFile to write to an S3 subfolder? Thanks, Chris -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3-SubFolder-Write-Issues-tp21997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Harshvardhan Chauhan* | Software Engineer *GumGum* http://www.gumgum.com/ | *Ads that stick* 310-260-9666 | ha...@gumgum.com
Re: Unable to read files In Yarn Mode of Spark Streaming ?
Streaming takes only new files into consideration. Add the file after starting the job. On Thu, Mar 12, 2015 at 2:26 PM, CH.KMVPRASAD [via Apache Spark User List] ml-node+s1001560n2201...@n3.nabble.com wrote: yes ! for testing purpose i defined single file in the specified directory .. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-read-files-In-Yarn-Mode-of-Spark-Streaming-tp22008p22013.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-read-files-In-Yarn-Mode-of-Spark-Streaming-tp22008p22015.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: hbase sql query
Thanks Todd, But this link is also based on scala, I was looking for some help with java Apis. Thanks, Udbhav Agarwal From: Todd Nist [mailto:tsind...@gmail.com] Sent: 12 March, 2015 5:28 PM To: Udbhav Agarwal Cc: Akhil Das; user@spark.apache.org Subject: Re: hbase sql query Have you considered using the spark-hbase-connector for this: https://github.com/nerdammer/spark-hbase-connector On Thu, Mar 12, 2015 at 5:19 AM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Thanks Akhil. Additionaly if we want to do sql query we need to create JavaPairRdd, then JavaRdd, then JavaSchemaRdd and then sqlContext.sql(sql query). Ryt ? Thanks, Udbhav Agarwal From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: 12 March, 2015 11:43 AM To: Udbhav Agarwal Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: hbase sql query Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete examplehttps://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q. Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. Thanks, Udbhav Agarwal
Re: hbase sql query
Ah, missed that java was a requirement. What distribution of Hadoop are you suing? Here is an example that may help, along with a few links to the JavaHbaseContext and a basic example. https://github.com/tmalaska/SparkOnHBase https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/java/com/cloudera/spark/hbase/example/JavaHBaseMapGetPutExample.java https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/com/cloudera/spark/hbase/JavaHBaseContext.scala On Thu, Mar 12, 2015 at 8:34 AM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Thanks Todd, But this link is also based on scala, I was looking for some help with java Apis. *Thanks,* *Udbhav Agarwal* *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* 12 March, 2015 5:28 PM *To:* Udbhav Agarwal *Cc:* Akhil Das; user@spark.apache.org *Subject:* Re: hbase sql query Have you considered using the spark-hbase-connector for this: https://github.com/nerdammer/spark-hbase-connector On Thu, Mar 12, 2015 at 5:19 AM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Thanks Akhil. Additionaly if we want to do sql query we need to create JavaPairRdd, then JavaRdd, then JavaSchemaRdd and then sqlContext.sql(sql query). Ryt ? *Thanks,* *Udbhav Agarwal* *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* 12 March, 2015 11:43 AM *To:* Udbhav Agarwal *Cc:* user@spark.apache.org *Subject:* Re: hbase sql query Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete example https://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q . Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. *Thanks,* *Udbhav Agarwal*
Re: Read parquet folders recursively
With fileStream you are free to plugin any InputFormat, in your case, you can easily plugin ParquetInputFormat. Here' some parquet hadoop examples https://github.com/Parquet/parquet-mr/tree/master/parquet-hadoop/src/main/java/parquet/hadoop/example . Thanks Best Regards On Thu, Mar 12, 2015 at 5:51 PM, Masf masfwo...@gmail.com wrote: Hi. Thanks for your answers, but, to read parquet files is necessary to use parquetFile method in org.apache.spark.sql.SQLContext, is it true? How can I combine your solution with the called to this method? Thanks!! Regards On Thu, Mar 12, 2015 at 8:34 AM, Yijie Shen henry.yijies...@gmail.com wrote: org.apache.spark.deploy.SparkHadoopUtil has a method: /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. */ def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = { def recurse(path: Path) = { val (directories, leaves) = fs.listStatus(path).partition(_.isDir) leaves ++ directories.flatMap(f = listLeafStatuses(fs, f.getPath)) } val baseStatus = fs.getFileStatus(basePath) if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } — Best Regards! Yijie Shen On March 12, 2015 at 2:35:49 PM, Akhil Das (ak...@sigmoidanalytics.com) wrote: Hi We have a custom build to read directories recursively, Currently we use it with fileStream like: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/datadumps/, (t: Path) = true, true, *true*) Making the 4th argument true to read recursively. You could give it a try https://s3.amazonaws.com/sigmoidanalytics-builds/spark-1.2.0-bin-spark-1.2.0-hadoop2.4.0.tgz Thanks Best Regards On Wed, Mar 11, 2015 at 9:45 PM, Masf masfwo...@gmail.com wrote: Hi all Is it possible to read recursively folders to read parquet files? Thanks. -- Saludos. Miguel Ángel -- Saludos. Miguel Ángel
spark sql performance
Hi, What is query time for join query on hbase with spark sql. Say tables in hbase have 0.5 million records each. I am expecting a query time (latency) in milliseconds with spark sql. Can this be possible ? Thanks, Udbhav Agarwal
Re: Timed out while stopping the job generator plus subsequent failures
I don't think that's the same issue I was seeing, but you can have a look at https://issues.apache.org/jira/browse/SPARK-4545 for more detail on my issue. On Thu, Mar 12, 2015 at 12:51 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, On Wed, Mar 11, 2015 at 7:43 PM, Tobias Pfeiffer t...@preferred.jp wrote: it seems like I am unable to shut down my StreamingContext properly, both in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster mode, subsequent use of a new StreamingContext will raise an InvalidActorNameException. I was wondering if this is related to your question on spark-dev http://tinyurl.com/q5cd5px Did you get any additional insight into this issue? In my case the processing of the first batch completes, but I don't know if there is anything wrong with the checkpoints? When I look to the corresponding checkpoint directory in HDFS, it doesn't seem like all state RDDs are persisted there, just a subset. Any ideas? Thanks Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Spark 1.3 for Scala 2.11 using Maven
Just FYI: what @Marcelo said fixed the issue for me. On Fri, Mar 6, 2015 at 7:11 AM, Sean Owen so...@cloudera.com wrote: -Pscala-2.11 and -Dscala-2.11 will happen to do the same thing for this profile. Why are you running install package and not just install? Probably doesn't matter. This sounds like you are trying to only build core without building everything else, which you can't do in general unless you already built and installed these snapshot artifacts locally. On Fri, Mar 6, 2015 at 12:46 AM, Night Wolf nightwolf...@gmail.com wrote: Hey guys, Trying to build Spark 1.3 for Scala 2.11. I'm running with the folllowng Maven command; -DskipTests -Dscala-2.11 clean install package Exception: [ERROR] Failed to execute goal on project spark-core_2.10: Could not resolve dependencies for project org.apache.spark:spark-core_2.10:jar:1.3.0-SNAPSHOT: The following artifacts could not be resolved: org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT, org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT: Failure to find org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT in http://repository.apache.org/snapshots was cached in the local repository, resolution will not be reattempted until the update interval of apache.snapshots has elapsed or updates are forced - [Help 1] I see these warnings in the log before this error: [INFO] [INFO] [INFO] Building Spark Project Core 1.3.0-SNAPSHOT [INFO] [WARNING] The POM for org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT is missing, no dependency information available [WARNING] The POM for org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT is missing, no dependency information available Any ideas? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: hbase sql query
Thanks Todd, But this particular code is missing JavaHbaseContext.java file. The code only has JavaHbaseContext.scala. Moreover I specifically wanted to perform join queries on tables with millions of records and expect millisecond latency. For doing a sql query of such type I guess we need JavaSchemaRdds and sqlContext.sql only. Isn’t it ?? Thanks, Udbhav Agarwal From: Todd Nist [mailto:tsind...@gmail.com] Sent: 12 March, 2015 6:19 PM To: Udbhav Agarwal Cc: Akhil Das; user@spark.apache.org Subject: Re: hbase sql query Ah, missed that java was a requirement. What distribution of Hadoop are you suing? Here is an example that may help, along with a few links to the JavaHbaseContext and a basic example. https://github.com/tmalaska/SparkOnHBase https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/java/com/cloudera/spark/hbase/example/JavaHBaseMapGetPutExample.java https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/com/cloudera/spark/hbase/JavaHBaseContext.scala On Thu, Mar 12, 2015 at 8:34 AM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Thanks Todd, But this link is also based on scala, I was looking for some help with java Apis. Thanks, Udbhav Agarwal From: Todd Nist [mailto:tsind...@gmail.commailto:tsind...@gmail.com] Sent: 12 March, 2015 5:28 PM To: Udbhav Agarwal Cc: Akhil Das; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: hbase sql query Have you considered using the spark-hbase-connector for this: https://github.com/nerdammer/spark-hbase-connector On Thu, Mar 12, 2015 at 5:19 AM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Thanks Akhil. Additionaly if we want to do sql query we need to create JavaPairRdd, then JavaRdd, then JavaSchemaRdd and then sqlContext.sql(sql query). Ryt ? Thanks, Udbhav Agarwal From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: 12 March, 2015 11:43 AM To: Udbhav Agarwal Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: hbase sql query Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete examplehttps://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q. Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.commailto:udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. Thanks, Udbhav Agarwal
Re: Unable to stop Worker in standalone mode by sbin/stop-all.sh
Interesting. Short term, maybe create the following file with pid 24922 ? /tmp/spark-taoewang-org.apache.spark.deploy.worker.Worker-1.pid Cheers On Thu, Mar 12, 2015 at 6:51 PM, sequoiadb mailing-list-r...@sequoiadb.com wrote: Nope, I can see the master file exist but not the worker: $ ls bitrock_installer.log hsperfdata_root hsperfdata_taoewang omatmp sbt2435921113715137753.log spark-taoewang-org.apache.spark.deploy.master.Master-1.pid 在 2015年3月13日,上午9:34,Ted Yu yuzhih...@gmail.com 写道: Does the machine have cron job that periodically cleans up /tmp dir ? Cheers On Thu, Mar 12, 2015 at 6:18 PM, sequoiadb mailing-list-r...@sequoiadb.com wrote: Checking the script, it seems spark-daemon.sh unable to stop the worker $ ./spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1 no org.apache.spark.deploy.worker.Worker to stop $ ps -elf | grep spark 0 S taoewang 24922 1 0 80 0 - 733878 futex_ Mar12 ? 00:08:54 java -cp /data/sequoiadb-driver-1.10.jar,/data/spark-sequoiadb-0.0.1-SNAPSHOT.jar::/data/spark/conf:/data/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar -XX:MaxPermSize=128m -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos-151:2181,centos-152:2181,centos-153:2181 -Dspark.deploy.zookeeper.dir=/data/zookeeper -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://centos-151:7077,centos-152:7077,centos-153:7077 In spark-daemon script it tries to find $pid in /tmp/: pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid” In my case pid supposed to be: /tmp/spark-taoewang-org.apache.spark.deploy.worker.Worker-1.pid However when I go through the files in /tmp directory I don’t find such file exist. I got 777 on /tmp and also tried to touch a file with my current account and success, so it shouldn’t be permission issue. $ ls -la / | grep tmp drwxrwxrwx. 6 root root 4096 Mar 13 08:19 tmp Anyone has any idea why the pid file didn’t show up? Thanks TW - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Handling worker batch processing during driver shutdown
Thanks for the reply! Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called. override def main(args: Array[String]) { // setup val ssc = new StreamingContext(sparkConf, Seconds(streamTime)) val inputStreams = (1 to numReceivers).map(i = ssc.receiverStream(custom receiver)) val messages = ssc.union(inputStreams) messages.foreachRDD { rdd = rdd.foreachPartition { p = val indexer = Indexer.getInstance() p.foreach(Indexer.process(_) match { case Some(entry) = indexer.index(entry) case None = }) Indexer.returnInstance(indexer) } } messages.print() sys.ShutdownHookThread { logInfo(** Shutdown hook triggered **) ssc.stop(false, true) logInfo(** Shutdown finished **) ssc.stop(true) } ssc.start() ssc.awaitTermination() } The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill appID. From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, March 12, 2015 1:29 PM To: Jose Fernandez Cc: user@spark.apache.org Subject: Re: Handling worker batch processing during driver shutdown Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.commailto:jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) [http://www.sdl.com/Content/images/Innovate_2015_400.png] http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.comhttp://www.websense.com/ Click herehttps://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q== to report this email as spam. [http://www.sdl.com/Content/images/Innovate_2015_400.png] www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.
Re: Using Neo4j with Apache Spark
Well, that's why I had also suggested using a pool of the GraphDBService objects :) Also present in the programming guide link I had given. TD On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj gautam1...@gmail.com wrote: Thanks a ton! That worked. However, this may have performance issue. As for each partition, I'd need to restart the server, that was the basic reason I was creating graphDb object outside this loop. On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com wrote: (Putting user@spark back in the to list) In the gist, you are creating graphDB object way outside the RDD.foreachPartition. I said last time, create the graphDB object inside the RDD.foreachPartition. You are creating it outside DStream.foreachRDD, and then using it from inside the rdd.foreachPartition. That is bringing the graphDB object in the task closure, and hence the system is trying to serialize the graphDB object when its serializing the closure. If you create the graphDB object inside the RDD.foreachPartition, then the closure will not refer to any prior graphDB object and therefore not serialize anything. On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com wrote: Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab I'll add the flag and send you stack trace, I have meetings now. On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com wrote: Could you show us that version of the code? Also helps to turn on java flag of extended debug info. That will show the lineage of objects leading to the nonserilaizable one. On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote: I tried that too. It result in same serializability issue. GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com wrote: What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at
Re: Using Neo4j with Apache Spark
Thanks a ton! That worked. However, this may have performance issue. As for each partition, I'd need to restart the server, that was the basic reason I was creating graphDb object outside this loop. On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das t...@databricks.com wrote: (Putting user@spark back in the to list) In the gist, you are creating graphDB object way outside the RDD.foreachPartition. I said last time, create the graphDB object inside the RDD.foreachPartition. You are creating it outside DStream.foreachRDD, and then using it from inside the rdd.foreachPartition. That is bringing the graphDB object in the task closure, and hence the system is trying to serialize the graphDB object when its serializing the closure. If you create the graphDB object inside the RDD.foreachPartition, then the closure will not refer to any prior graphDB object and therefore not serialize anything. On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj gautam1...@gmail.com wrote: Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab I'll add the flag and send you stack trace, I have meetings now. On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das t...@databricks.com wrote: Could you show us that version of the code? Also helps to turn on java flag of extended debug info. That will show the lineage of objects leading to the nonserilaizable one. On Mar 12, 2015 1:32 AM, Gautam Bajaj gautam1...@gmail.com wrote: I tried that too. It result in same serializability issue. GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das t...@databricks.com wrote: What is GraphDatabaseService object that you are using? Instead of creating them on the driver (outside foreachRDD), can you create them inside the RDD.foreach? In general, the right pattern for doing this in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd So you should be doing (sorry for writing in scala) dstream.foreachRDD ((rdd: RDD, time: Time) = { rdd.foreachPartition(iterator = // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService objects // Use it to send the whole partition to Neo4j // Destroy the object or release it to the pool }) On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj gautam1...@gmail.com wrote: Neo4j is running externally. It has nothing to do with Spark processes. Basically, the problem is, I'm unable to figure out a way to store output of Spark on the database. As Spark Streaming requires Neo4j Core Java API to be serializable as well. The answer points out to using REST API but their performance is really poor when compared to Core Java API : http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das t...@databricks.com wrote: Well the answers you got there are correct as well. Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that? TD On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj gautam1...@gmail.com wrote: Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r gautam1...@gmail.com wrote: I'm trying to use Neo4j with Apache Spark Streaming but I am finding serializability as an issue. Basically, I want Apache Spark to parse and bundle my data in real time. After, the data has been bundled it should be stored in the database, Neo4j. However, I am getting this error: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) at twoGrams.Main$4.call(Main.java:102) at twoGrams.Main$4.call(Main.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at
Re: Support for skewed joins in Spark
I sent a PR to add skewed join last year: https://github.com/apache/spark/pull/3505 However, it does not split a key to multiple partitions. Instead, if a key has too many values that can not be fit in to memory, it will store the values into the disk temporarily and use disk files to do the join. Best Regards, Shixiong Zhu 2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com: Does Spark support skewed joins similar to Pig which distributes large keys over multiple partitions? I tried using the RangePartitioner but I am still experiencing failures because some keys are too large to fit in a single partition. I cannot use broadcast variables to work-around this because both RDDs are too large to fit in driver memory. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Handling worker batch processing during driver shutdown
What version of Spark are you using. You may be hitting a known but solved bug where the receivers would not get stop signal and (stopGracefully = true) would wait for a while for the receivers to stop indefinitely. Try setting stopGracefully to false and see if it works. This bug should have been solved in spark 1.2.1 https://issues.apache.org/jira/browse/SPARK-5035 TD On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez jfernan...@sdl.com wrote: Thanks for the reply! Theoretically I should be able to do as you suggest as I follow the pool design pattern from the documentation, but I don’t seem to be able to run any code after .stop() is called. override def main(args: Array[String]) { // setup val ssc = new StreamingContext(sparkConf, Seconds(streamTime)) val inputStreams = (1 to numReceivers).map(i = ssc.receiverStream(custom receiver)) val messages = ssc.union(inputStreams) messages.foreachRDD { rdd = rdd.foreachPartition { p = val indexer = Indexer.getInstance() p.foreach(Indexer.process(_) match { case Some(entry) = indexer.index(entry) case None = }) Indexer.returnInstance(indexer) } } messages.print() sys.ShutdownHookThread { logInfo(** Shutdown hook triggered **) ssc.stop(false, true) logInfo(** Shutdown finished **) ssc.stop(true) } ssc.start() ssc.awaitTermination() } The first shutdown log message is always displayed, but the second message never does. I’ve tried multiple permutations of the stop function calls and even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 on CDH 5.3. I stop the application with yarn application -kill appID. *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, March 12, 2015 1:29 PM *To:* Jose Fernandez *Cc:* user@spark.apache.org *Subject:* Re: Handling worker batch processing during driver shutdown Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com Click here https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q== to report this email as spam. http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK.
Re: spark sql writing in avro
Dale, I basically have the same maven dependency above, but my code will not compile due to not being able to reference to AvroSaver, though the saveAsAvro reference compiles fine, which is weird. Eventhough saveAsAvro compiles for me, it errors out when running the spark job due to it not being implemented (the job quits and says non implemented method or something along those lines). I will try going the spark shell and passing in the jar built from github since I haven't tried that quite yet. On Thu, Mar 12, 2015 at 6:44 PM, M. Dale medal...@yahoo.com wrote: Short answer: if you downloaded spark-avro from the repo.maven.apache.org repo you might be using an old version (pre-November 14, 2014) - see timestamps at http://repo.maven.apache.org/ maven2/com/databricks/spark-avro_2.10/0.1/ Lots of changes at https://github.com/databricks/spark-avro since then. Databricks, thank you for sharing the Avro code!!! Could you please push out the latest version or update the version number and republish to repo.maven.apache.org (I have no idea how jars get there). Or is there a different repository that users should point to for this artifact? Workaround: Download from https://github.com/databricks/spark-avro and build with latest functionality (still version 0.1) and add to your local Maven or Ivy repo. Long version: I used a default Maven build and declared my dependency on: dependency groupIdcom.databricks/groupId artifactIdspark-avro_2.10/artifactId version0.1/version /dependency Maven downloaded the 0.1 version from http://repo.maven.apache.org/ maven2/com/databricks/spark-avro_2.10/0.1/ and included it in my app code jar. From spark-shell: import com.databricks.spark.avro._ import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) # This schema includes LONG for time in millis (https://github.com/medale/ spark-mail/blob/master/mailrecord/src/main/avro/com/ uebercomputing/mailrecord/MailRecord.avdl) val recordsSchema = sqlContext.avroFile(/opt/rpm1/enron/enron-tiny.avro) java.lang.RuntimeException: Unsupported type LONG However, checking out the spark-avro code from its GitHub repo and adding a test case against the MailRecord avro everything ran fine. So I built the databricks spark-avro locally on my box and then put it in my local Maven repo - everything worked from spark-shell when adding that jar as dependency. Hope this helps for the save case as well. On the pre-14NOV version, avro.scala says: // TODO: Implement me. implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) { def saveAsAvroFile(path: String): Unit = ??? } Markus On 03/12/2015 07:05 PM, kpeng1 wrote: Hi All, I am current trying to write out a scheme RDD to avro. I noticed that there is a databricks spark-avro library and I have included that in my dependencies, but it looks like I am not able to access the AvroSaver object. On compilation of the job I get this: error: not found: value AvroSaver [ERROR] AvroSaver.save(resultRDD, args(4)) I also tried calling saveAsAvro on the resultRDD(the actual rdd with the results) and that passes compilation, but when I run the code I get an error that says the saveAsAvro is not implemented. I am using version 0.1 of spark-avro_2.10 -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support for skewed joins in Spark
Thanks Shixiong, I'll try out your PR. Do you know what the status of the PR is? Are there any plans to incorporate this change to the DataFrames/SchemaRDDs in Spark 1.3? Soila On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote: I sent a PR to add skewed join last year: https://github.com/apache/spark/pull/3505 However, it does not split a key to multiple partitions. Instead, if a key has too many values that can not be fit in to memory, it will store the values into the disk temporarily and use disk files to do the join. Best Regards, Shixiong Zhu 2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com: Does Spark support skewed joins similar to Pig which distributes large keys over multiple partitions? I tried using the RangePartitioner but I am still experiencing failures because some keys are too large to fit in a single partition. I cannot use broadcast variables to work-around this because both RDDs are too large to fit in driver memory. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
run spark standalone mode
Hi guys, I have a stupid question, but I am not sure how to get out of it. I deployed spark 1.2.1 on a cluster of 30 nodes. Looking at master:8088 I can see all the workers I have created so far. (I start the cluster with sbin/start-all.sh) However, when running a Spark SQL query or even spark-shell, I cannot see any job executing at master webUI, but the jobs are able to finish. I suspect they are executing locally on the master, but I don't understand why/how and why not on slave machines. My conf/spark-env.sh is as following:export SPARK_MASTER_IP=ms0220 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/users/rgrandl/software/spark-1.2.1-bin-hadoop2.4/lib/snappy-java-1.0.4.1.jar export SPARK_LOCAL_DIRS=/users/rgrandl/software/data/spark/local export SPARK_WORKER_MEMORY=52000M export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_CORES=2 export SPARK_WORKER_DIR=/users/rgrandl/software/data/spark/worker export SPARK_DAEMON_MEMORY=5200M #export SPARK_DAEMON_JAVA_OPTS=4800M While conf/slaves is populated with the list of machines used for workers. I have to mention that spark-env.sh and slaves files are deployed on all machines. Thank you,Robert
SPARKQL Join partitioner
Hello, I am wondering how does /join/ work in SparkQL? Does it co-partition two tables? or does it do it by wide dependency? I have two big tables to join, the query creates more than 150Go temporary data, so the query stops because I have no space left my disk. I guess I could use a HashPartitioner in order to join with inputs co-partitioned, like this : 1/ Read my two tables in two SchemaRDD 2/ Transform the two SchemaRDD in two RDD[(Key,Value)] 3/ Repartition my two RDDs with my partitioner : rdd.PartitionBy(new HashPartitioner(100)) 4/ Join my two RDDs 5/ Transform my result in SchemaRDD 6/ Reconstruct my hive table. Is there an easy way via SparkQL (hivecontext)? Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARKQL-Join-partitioner-tp22016.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: run spark standalone mode
Sorry guys for this. It seems that I need to start the thrift server with --master spark://ms0220:7077 option and now I can see applications running in my web UI. Thanks,Robert On Thursday, March 12, 2015 10:57 AM, Grandl Robert rgra...@yahoo.com.INVALID wrote: I figured out for spark-shell by passing the --master option. However, still troubleshooting for launching sql queries. My current command is like that: ./bin/beeline -u jdbc:hive2://ms0220:1 -n `whoami` -p ignored -f tpch_query10.sql On Thursday, March 12, 2015 10:37 AM, Grandl Robert rgra...@yahoo.com.INVALID wrote: Hi guys, I have a stupid question, but I am not sure how to get out of it. I deployed spark 1.2.1 on a cluster of 30 nodes. Looking at master:8088 I can see all the workers I have created so far. (I start the cluster with sbin/start-all.sh) However, when running a Spark SQL query or even spark-shell, I cannot see any job executing at master webUI, but the jobs are able to finish. I suspect they are executing locally on the master, but I don't understand why/how and why not on slave machines. My conf/spark-env.sh is as following:export SPARK_MASTER_IP=ms0220 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/users/rgrandl/software/spark-1.2.1-bin-hadoop2.4/lib/snappy-java-1.0.4.1.jar export SPARK_LOCAL_DIRS=/users/rgrandl/software/data/spark/local export SPARK_WORKER_MEMORY=52000M export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_CORES=2 export SPARK_WORKER_DIR=/users/rgrandl/software/data/spark/worker export SPARK_DAEMON_MEMORY=5200M #export SPARK_DAEMON_JAVA_OPTS=4800M While conf/slaves is populated with the list of machines used for workers. I have to mention that spark-env.sh and slaves files are deployed on all machines. Thank you,Robert
How to consider HTML files in Spark
Hi.I am very much fascinated to Spark framework.I am trying to use Pyspark + Beautifulsoup to parse HTML files.I am facing problems to load html file into beautiful soup. Example filepath= file:///path to html directory def readhtml(inputhtml): { soup=Beautifulsoup(inputhtml) //to load html content } loaddata=sc.textFile(filepath).map(readhtml) The problem is here spark considers loaded file as textfile and goes through process line by line.I want to consider to load the entire html content into Beautifulsoup for further processing.. Does anyone have any idea to how to take the whole html file as input instead of linebyline processing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-consider-HTML-files-in-Spark-tp22017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Jackson-core-asl conflict with Spark
Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay.
Re: Jackson-core-asl conflict with Spark
Looking at dependency tree: [INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile ... [INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile In root pom.xml : dependency groupIdorg.codehaus.jackson/groupId artifactIdjackson-core-asl/artifactId version${codehaus.jackson.version}/version scope${hadoop.deps.scope}/scope /dependency You can try passing -Dcodehaus.jackson.version=1.9.13 in the command line. Cheers On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar uthayan.sutha...@gmail.com wrote: Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay.
Re: Writing to a single file from multiple executors
If you use DStream.saveAsHadoopFiles (or equivalent RDD ops) with the appropriate output format (for Avro) then each partition of the RDDs will be written to a different file. However there is probably going to be a large number of small files and you may have to run a separate compaction phase to coalesce them into larger files. On Mar 12, 2015 9:47 AM, Maiti, Samya samya.ma...@philips.com wrote: Hi TD, I want to append my record to a AVRO file which will be later used for querying. Having a single file is not mandatory for us but then how can we make the executors append the AVRO data to multiple files. Thanks, Sam On Mar 12, 2015, at 4:09 AM, Tathagata Das t...@databricks.com wrote: Why do you have to write a single file? On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.com wrote: Hi Experts, I have a scenario, where in I want to write to a avro file from a streaming job that reads data from kafka. But the issue is, as there are multiple executors and when all try to write to a given file I get a concurrent exception. I way to mitigate the issue is to repartition have a single writer task, but as my data is huge that is not a feasible option. Any suggestions welcomed. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Re: How to consider HTML files in Spark
sc.wholeTextFile() is what you need. http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.wholeTextFiles On Thu, Mar 12, 2015 at 9:26 AM, yh18190 yh18...@gmail.com wrote: Hi.I am very much fascinated to Spark framework.I am trying to use Pyspark + Beautifulsoup to parse HTML files.I am facing problems to load html file into beautiful soup. Example filepath= file:///path to html directory def readhtml(inputhtml): { soup=Beautifulsoup(inputhtml) //to load html content } loaddata=sc.textFile(filepath).map(readhtml) The problem is here spark considers loaded file as textfile and goes through process line by line.I want to consider to load the entire html content into Beautifulsoup for further processing.. Does anyone have any idea to how to take the whole html file as input instead of linebyline processing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-consider-HTML-files-in-Spark-tp22017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
KafkaUtils and specifying a specific partition
Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Jackson-core-asl conflict with Spark
What version of Hadoop are you using, and if so are you setting the right Hadoop profile? because they already set the Jackson version to 1.9.13 IIRC. So maybe that's the issue. On Thu, Mar 12, 2015 at 5:15 PM, Ted Yu yuzhih...@gmail.com wrote: Looking at dependency tree: [INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile ... [INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile In root pom.xml : dependency groupIdorg.codehaus.jackson/groupId artifactIdjackson-core-asl/artifactId version${codehaus.jackson.version}/version scope${hadoop.deps.scope}/scope /dependency You can try passing -Dcodehaus.jackson.version=1.9.13 in the command line. Cheers On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar uthayan.sutha...@gmail.com wrote: Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing to a single file from multiple executors
Hi TD, I want to append my record to a AVRO file which will be later used for querying. Having a single file is not mandatory for us but then how can we make the executors append the AVRO data to multiple files. Thanks, Sam On Mar 12, 2015, at 4:09 AM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Why do you have to write a single file? On Wed, Mar 11, 2015 at 1:00 PM, SamyaMaiti samya.maiti2...@gmail.commailto:samya.maiti2...@gmail.com wrote: Hi Experts, I have a scenario, where in I want to write to a avro file from a streaming job that reads data from kafka. But the issue is, as there are multiple executors and when all try to write to a given file I get a concurrent exception. I way to mitigate the issue is to repartition have a single writer task, but as my data is huge that is not a feasible option. Any suggestions welcomed. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-a-single-file-from-multiple-executors-tp22003.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
Re: Jackson-core-asl conflict with Spark
So... one solution would be to use a non-Jurassic version of Jackson. 2.6 will drop before too long, and 3.0 is in longer-term planning. The 1.x series is long deprecated. If you're genuinely stuck with something ancient, then you need to include the JAR that contains the class, and 1.9.13 does not. Why do you think you need that particular version? — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar uthayan.sutha...@gmail.com wrote: Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay.
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
Giving a bit more detail on the error would make it a lot easier for others to help you out. Eg., in this case, it would have helped if included your actual compile error. In any case, I'm assuming your issue is b/c that class if private to spark. You can sneak around that by using Class.forName(stringOfClassName) instead: scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] console:8: error: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] ^ scala Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus) res1: Class[_] = class org.apache.spark.scheduler.HighlyCompressedMapStatus hope this helps, Imran On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: KafkaUtils and specifying a specific partition
Thanks! :) Colin McQueen *Software Developer* On Thu, Mar 12, 2015 at 3:05 PM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Colin, my understanding is that this is currently not possible with KafkaUtils. You would have to write a custom receiver using Kafka's SimpleConsumer API. https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Regards, Jeff 2015-03-12 17:58 GMT+01:00 ColinMc colin.mcqu...@shiftenergy.com: Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: run spark standalone mode
I figured out for spark-shell by passing the --master option. However, still troubleshooting for launching sql queries. My current command is like that: ./bin/beeline -u jdbc:hive2://ms0220:1 -n `whoami` -p ignored -f tpch_query10.sql On Thursday, March 12, 2015 10:37 AM, Grandl Robert rgra...@yahoo.com.INVALID wrote: Hi guys, I have a stupid question, but I am not sure how to get out of it. I deployed spark 1.2.1 on a cluster of 30 nodes. Looking at master:8088 I can see all the workers I have created so far. (I start the cluster with sbin/start-all.sh) However, when running a Spark SQL query or even spark-shell, I cannot see any job executing at master webUI, but the jobs are able to finish. I suspect they are executing locally on the master, but I don't understand why/how and why not on slave machines. My conf/spark-env.sh is as following:export SPARK_MASTER_IP=ms0220 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/users/rgrandl/software/spark-1.2.1-bin-hadoop2.4/lib/snappy-java-1.0.4.1.jar export SPARK_LOCAL_DIRS=/users/rgrandl/software/data/spark/local export SPARK_WORKER_MEMORY=52000M export SPARK_WORKER_INSTANCES=2 export SPARK_WORKER_CORES=2 export SPARK_WORKER_DIR=/users/rgrandl/software/data/spark/worker export SPARK_DAEMON_MEMORY=5200M #export SPARK_DAEMON_JAVA_OPTS=4800M While conf/slaves is populated with the list of machines used for workers. I have to mention that spark-env.sh and slaves files are deployed on all machines. Thank you,Robert
Re: KafkaUtils and specifying a specific partition
KafkaUtils.createDirectStream, added in spark 1.3, will let you specify a particular topic and partition On Thu, Mar 12, 2015 at 1:07 PM, Colin McQueen colin.mcqu...@shiftenergy.com wrote: Thanks! :) Colin McQueen *Software Developer* On Thu, Mar 12, 2015 at 3:05 PM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Colin, my understanding is that this is currently not possible with KafkaUtils. You would have to write a custom receiver using Kafka's SimpleConsumer API. https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Regards, Jeff 2015-03-12 17:58 GMT+01:00 ColinMc colin.mcqu...@shiftenergy.com: Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Handling worker batch processing during driver shutdown
Can you access the batcher directly? Like is there is there a handle to get access to the batchers on the executors by running a task on that executor? If so, after the streamingContext has been stopped (not the SparkContext), then you can use `sc.makeRDD()` to run a dummy task like this. sc.makeRDD(1 to 1000, 1000).foreach { x = Batcher.get().flush() } With large number of tasks and no other jobs running in the system, at least one task will run in each executor and therefore will flush the batcher. TD On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez jfernan...@sdl.com wrote: Hi folks, I have a shutdown hook in my driver which stops the streaming context cleanly. This is great as workers can finish their current processing unit before shutting down. Unfortunately each worker contains a batch processor which only flushes every X entries. We’re indexing to different indices in elasticsearch and using the bulk index request for performance. As far as Spark is concerned, once data is added to the batcher it is considered processed, so our workers are being shut down with data still in the batcher. Is there any way to coordinate the shutdown with the workers? I haven’t had any luck searching for a solution online. I would appreciate any suggestions you may have. Thanks :) http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com
Re: Jackson-core-asl conflict with Spark
Thank you all, I tried this -Dcodehaus.jackson.version=1.9.13 as suggested by Ted Yu and it resolved my issue. Many thanks. Cheers, Uthay. On 12 March 2015 at 17:22, Ted Yu yuzhih...@gmail.com wrote: Uthay: You can run mvn dependency:tree command to find out the actual jackson-core-asl your project needs. FYI On Thu, Mar 12, 2015 at 10:17 AM, Paul Brown p...@mult.ifario.us wrote: So... one solution would be to use a non-Jurassic version of Jackson. 2.6 will drop before too long, and 3.0 is in longer-term planning. The 1.x series is long deprecated. If you're genuinely stuck with something ancient, then you need to include the JAR that contains the class, and 1.9.13 does not. Why do you think you need that particular version? — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar uthayan.sutha...@gmail.com wrote: Hello Guys, I'm running into below error: Exception in thread main java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with --jars configuration, but still getting errors. I searched on the net and found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy. I tried disabling SQL module and recompiled Spark and installed the custom library, yet no joy. I'm running out of ideas, could you please assist me with this issue? Many thanks. Uthay.
Re: Efficient Top count in each window
Why are you repartitioning 1? That would obviously be slow, you are converting a distributed operation to a single node operation. Also consider using RDD.top(). If you define the ordering right (based on the count), then you will get top K across then without doing a shuffle for sortByKey. Much cheaper. TD On Thu, Mar 12, 2015 at 11:06 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I have a streaming application where am doing top 10 count in each window which seems slow. Is there efficient way to do this. val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val topCounts = counts.repartition(1).map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10)) Regards, Laeeq
Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?
Join causes a shuffle (sending data across the network). I expect it will be better to filter before you join, so you reduce the amount of data which is sent across the network. Note this would be true for *any* transformation which causes a shuffle. It would not be true if you're combining RDDs with union, since that doesn't cause a shuffle. On Thu, Mar 12, 2015 at 11:04 AM, shahab shahab.mok...@gmail.com wrote: Hi, Probably this question is already answered sometime in the mailing list, but i couldn't find it. Sorry for posting this again. I need to to join and apply filtering on three different RDDs, I just wonder which of the following alternatives are more efficient: 1- first joint all three RDDs and then do filtering on resulting joint RDD or 2- Apply filtering on each individual RDD and then join the resulting RDDs Or probably there is no difference due to lazy evaluation and under beneath Spark optimisation? best, /Shahab
AWS SDK HttpClient version conflict (spark.files.userClassPathFirst not working)
I'm trying to use the AWS SDK (v1.9.23) to connect to DynamoDB from within a Spark application. Spark 1.2.1 is assembled with HttpClient 4.2.6, but the AWS SDK is depending on HttpClient 4.3.4 for it's communication with DynamoDB. The end result is an error when the app tries to connect to DynamoDB and gets Spark's version instead: java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpPatch at com.amazonaws.http.AmazonHttpClient.clinit(AmazonHttpClient.java:129) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:120) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:359) Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpPatch Including HttpClient 4.3.4 as user jars doesn't improve the situation much: java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:95) I've seen the documenation regarding the 'spark.files.userClassPathFirst' flag and have tried to use it thinking it would resolve this issue. However, when that flag is used I get an NoClassDefFoundError on 'scala.Serializable': java.lang.NoClassDefFoundError: scala/Serializable ... at org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:46) ... Caused by: java.lang.ClassNotFoundException: scala.Serializable This seems odd to me, since scala.Serializable is included in the spark assembly. I thought perhaps my app was compiled against a different scala version than spark uses, but eliminated that possibility by using the scala compiler directly out of the spark assembly jar with identical results. Has anyone else seen this issue, had any success with the spark.files.userClassPathFirst flag, or been able to use the AWS SDK? I was going to submit this a Spark JIRA issue, but thought I would check here first. Thanks, Adam Lewandowski
Re: PairRDD serialization exception
I have the same exact error. Am running a pyspark job in yarn-client mode. Works well in standalone but I need to run it in yarn-client mode. Other people reported the same problem when bundling jars and extra dependencies. I'm pointing the pyspark to use a specific python executable bundled with external dependencies. However since the job runs on standalone, I see no reason why it should give me this error whilst saving to s3 on yarn-client. Thanks. Any help or direction would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PairRDD-serialization-exception-tp21999p22019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
BTW, I was running tests from SBT when I get the errors. One test turn a Seq of case class to DataFrame. I also tried run similar code in the console, but failed with same error. I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4 Any idea? Jianshi On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: KafkaUtils and specifying a specific partition
Hi Colin, my understanding is that this is currently not possible with KafkaUtils. You would have to write a custom receiver using Kafka's SimpleConsumer API. https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Regards, Jeff 2015-03-12 17:58 GMT+01:00 ColinMc colin.mcqu...@shiftenergy.com: Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Efficient Top count in each window
Hi, I have a streaming application where am doing top 10 count in each window which seems slow. Is there efficient way to do this. val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4)) val topCounts = counts.repartition(1).map(_.swap).transform(rdd = rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd = rdd.take(10)) Regards, Laeeq
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
The error is in the original post. Here's the recipe that worked for me: kryo.register(Class.forName(org.roaringbitmap.RoaringArray$Element)) kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus)) So your Class.forName workaround worked, thanks! On Thu, Mar 12, 2015 at 10:56 AM, Imran Rashid iras...@cloudera.com wrote: Giving a bit more detail on the error would make it a lot easier for others to help you out. Eg., in this case, it would have helped if included your actual compile error. In any case, I'm assuming your issue is b/c that class if private to spark. You can sneak around that by using Class.forName(stringOfClassName) instead: scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] console:8: error: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus] ^ scala Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus) res1: Class[_] = class org.apache.spark.scheduler.HighlyCompressedMapStatus hope this helps, Imran On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm using a pre-built Spark; I'm not trying to compile Spark. The compile error appears when I try to register HighlyCompressedMapStatus in my program: kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) If I don't register it, I get a runtime error saying that it needs to be registered (the error is only when I turn on kryo). However the code is running smoothly with kryo turned off. On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com wrote: I'm not sure what you mean. Are you asking how you can recompile all of spark and deploy it, instead of using one of the pre-built versions? https://spark.apache.org/docs/latest/building-spark.html Or are you seeing compile problems specifically w/ HighlyCompressedMapStatus? The code compiles fine, so I'm not sure what problem you are running into -- we'd need a lot more info to help On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote: Does anyone know how to get the HighlyCompressedMapStatus to compile? I will try turning off kryo in 1.2.0 and hope things don't break. I want to benefit from the MapOutputTracker fix in 1.2.0. On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote: the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote: I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: can not submit job to spark in windows
Seems the path is not set correctly. Its looking for C:\\bin\winutils.exe . You would need to set the path correctly. On Thu, Feb 26, 2015 at 7:59 PM, sergunok ser...@gmail.com wrote: Hi! I downloaded and extracted Spark to local folder under windows 7 and have successfully played with it in pyspark interactive shell. BUT When I try to use spark-submit (for example: job-submit pi.py ) I get: C:\spark-1.2.1-bin-hadoop2.4\binspark-submit.cmd pi.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/02/26 18:21:37 INFO SecurityManager: Changing view acls to: sergun 15/02/26 18:21:37 INFO SecurityManager: Changing modify acls to: sergun 15/02/26 18:21:37 INFO SecurityManager: SecurityManager: authentication disabled ; ui acls disabled; users with view permissions: Set(sergun); users with mo dify permissions: Set(user) 15/02/26 18:21:38 INFO Slf4jLogger: Slf4jLogger started 15/02/26 18:21:38 INFO Remoting: Starting remoting 15/02/26 18:21:39 INFO Remoting: Remoting started; listening on addresses :[akka .tcp://sparkDriver@mypc:56640] 15/02/26 18:21:39 INFO Utils: Successfully started service 'sparkDriver' on port 56640. 15/02/26 18:21:39 INFO SparkEnv: Registering MapOutputTracker 15/02/26 18:21:39 INFO SparkEnv: Registering BlockManagerMaster 15/02/26 18:21:39 INFO DiskBlockManager: Created local directory at C:\Users\sergun\AppData\Local\Temp\spark-adddeb0b-d6c8-4720-92e3-05255d46ea66\spark-c65cd4 06-28a4-486d-a1ad-92e4814df6fa 15/02/26 18:21:39 INFO MemoryStore: MemoryStore started with capacity 265.0 MB 15/02/26 18:21:40 WARN NativeCodeLoader: Unable to load native-hadoop library fo r your platform... using builtin-java classes where applicable 15/02/26 18:21:40 ERROR Shell: Failed to locate the winutils binary in the hadoo p binary path java.io.IOException: Could not locate executable C:\\bin\winutils.exe in the Had oop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Group s.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupI nformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(User GroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 44) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :214) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1873) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:240) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou rce) at java.lang.reflect.Constructor.newInstance(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand .java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) 15/02/26 18:21:41 INFO HttpFileServer: HTTP File server directory is C:\Users\sergun\AppData\Local\Temp\spark-79f2a924-4fff-432c-abc8-ac9c6c4ee0c7\spark-1f295 e28-f0db-4daf-b877-2a47990b6e88 15/02/26 18:21:41 INFO HttpServer: Starting HTTP Server 15/02/26 18:21:41 INFO Utils: Successfully started service 'HTTP file server' on port 56641. 15/02/26 18:21:41 INFO Utils: Successfully started service 'SparkUI' on port 404 0. 15/02/26 18:21:41 INFO SparkUI: Started SparkUI at http://mypc:4040 15/02/26 18:21:42 INFO Utils: Copying C:\spark-1.2.1-bin-hadoop2.4\bin\pi.py to C:\Users\sergun\AppData\Local\Temp\spark-76a21028-ccce-4308-9e70-09c3cfa76477\
Re: KafkaUtils and specifying a specific partition
If you want to use another kafka receiver instead of current spark kafka receiver, You can see this: https://github.com/mykidong/spark-kafka-simple-consumer-receiver/blob/master/src/main/java/spark/streaming/receiver/kafka/KafkaReceiverUtils.java You can handle to get just the stream from the specified partition. - Kidong. -- Original Message -- From: ColinMc [via Apache Spark User List] ml-node+s1001560n22018...@n3.nabble.com To: mykidong mykid...@gmail.com Sent: 2015-03-13 오전 1:58:08 Subject: KafkaUtils and specifying a specific partition Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018p22023.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Logistic Regression displays ERRORs
I am running LogisticRegressionWithLBFGS. I got these lines on my console: 2015-03-12 17:38:03,897 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.5 2015-03-12 17:38:03,967 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.25 2015-03-12 17:38:04,036 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.125 2015-03-12 17:38:04,105 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.0625 2015-03-12 17:38:04,176 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.03125 2015-03-12 17:38:04,247 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.015625 2015-03-12 17:38:04,317 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.0078125 2015-03-12 17:38:04,461 ERROR breeze.optimize.StrongWolfeLineSearch | Encountered bad values in function evaluation. Decreasing step size to 0.005859375 2015-03-12 17:38:04,605 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:04,672 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:04,747 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:04,818 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:04,890 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:04,962 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:05,038 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:05,107 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:05,186 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:05,256 INFO breeze.optimize.StrongWolfeLineSearch | Line search t: NaN fval: NaN rhs: NaN cdd: NaN 2015-03-12 17:38:05,257 ERROR breeze.optimize.LBFGS | Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed What causes them and how do I fix them? I checked my data and there seemed nothing out of the ordinary. The resulting prediction model seemed acceptable to me. So, are these ERRORs actually WARNINGs? Could we or should we tune the level of these messages down one notch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Logistic-Regression-displays-ERRORs-tp22024.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
In fact, by activating netlib with native libraries it goes faster. Thanks On Tue, Mar 10, 2015 at 7:03 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are a couple of differences between the ml-matrix implementation and the one used in AMPCamp - I think the AMPCamp one uses JBLAS which tends to ship native BLAS libraries along with it. In ml-matrix we switched to using Breeze + Netlib BLAS which is faster but needs some setup [1] to pick up native libraries. If native libraries are not found it falls back to a JVM implementation, so that might explain the slow down. - The other difference if you are comparing the whole image pipeline is that I think the AMPCamp version used NormalEquations which is around 2-3x faster (just in terms of number of flops) compared to TSQR. [1] https://github.com/fommil/netlib-java#machine-optimised-system-libraries Thanks Shivaram On Tue, Mar 10, 2015 at 9:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
On Thu, Mar 12, 2015 at 3:05 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: In fact, by activating netlib with native libraries it goes faster. Glad you got it work ! Better performance was one of the reasons we made the switch. Thanks On Tue, Mar 10, 2015 at 7:03 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are a couple of differences between the ml-matrix implementation and the one used in AMPCamp - I think the AMPCamp one uses JBLAS which tends to ship native BLAS libraries along with it. In ml-matrix we switched to using Breeze + Netlib BLAS which is faster but needs some setup [1] to pick up native libraries. If native libraries are not found it falls back to a JVM implementation, so that might explain the slow down. - The other difference if you are comparing the whole image pipeline is that I think the AMPCamp version used NormalEquations which is around 2-3x faster (just in terms of number of flops) compared to TSQR. [1] https://github.com/fommil/netlib-java#machine-optimised-system-libraries Thanks Shivaram On Tue, Mar 10, 2015 at 9:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I'm trying to play with the implementation of least square solver (Ax = b) in mlmatrix.TSQR where A is a 5*1024 matrix and b a 5*10 matrix. It works but I notice that it's 8 times slower than the implementation given in the latest ampcamp : http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html . As far as I know these two implementations come from the same basis. What is the difference between these two codes ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: Is there a limit to the number of RDDs in a Spark context?
Hi, It's been some time since my last message on the subject of using many RDDs in a Spark job, but I have just encountered the same problem again. The thing it's that I have an RDD of time tagged data, that I want to 1) divide into windows according to a timestamp field; 2) compute KMeans for each time window; 3) collect the results in a RDD of pairs that contains the centroids for each time window. For 1) I generate a RDD of pairs where the key is an id for the time window, but for 2) I find the problem that KMeans from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I think this is a reusability problem for any algorithms in MLlib based on passing an RDD, if we want to apply the algorithm independently to several groups of data. So the only approaches I can imagine are: a) Generate an RDD per time window, which is easy to do but doesn't work because it's easy to end up with thousand of windows hence thousands of RDDs, which freezes the Spark scheduler, as seen in my previous messages b) Collect the set of ids for the time windows in the driver, and traverse that set by generating an RDD per each window, calling KMeans, and then storing the results with an export action. I will try that now and I think that could work because only one RDD per window will be present at the same time. The point here is that we avoid creating an RDD with a lineage dependending on a thousand RDDs, like in the collecting phase 3) of a). But that implies a sequential execution of the computation of KMeans, which is a waste of resources: imagine I have a cluster with 200 machines and I can compute each call to KMeans in 5 machines in 10 minutes, and I have 1000 windows to compute hence 1000 calls to KMeans; by sequencing the KMeans computations I would be having 195 idle machines and a running time of 10 * 1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs for some number m that doesn't freezes the Spark scheduler, but I think that's a not very clean workaround. Also, this makes very difficult to reuse this computation of KMeans by window in a bigger program, because I'm not able to get an RDD with a key per window id and the centroids in the values. The only way I imagine I could do that is by storing the pairs in a database during the export actions, and then loading all the results in a single RDD, but I would prefer to do everything inside Spark if possible. Maybe I'm missing something here, any idea would be appreciated. Thanks in advance for your help, Greetings, Juan Rodriguez 2015-02-18 20:23 GMT+01:00 Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com: Hi Sean, Thanks a lot for your answer. That explains it, as I was creating thousands of RDDs, so I guess the communication overhead was the reason why the Spark job was freezing. After changing the code to use RDDs of pairs and aggregateByKey it works just fine, and quite fast. Again, thanks a lot for your help. Greetings, Juan 2015-02-18 15:35 GMT+01:00 Sean Owen so...@cloudera.com: At some level, enough RDDs creates hundreds of thousands of tiny partitions of data each of which creates a task for each stage. The raw overhead of all the message passing can slow things down a lot. I would not design something to use an RDD per key. You would generally use key by some value you want to divide and filter on, and then use a *ByKey to do your work. You don't work with individual RDDs this way, but usually that's good news. You usually have a lot more flexibility operating just in pure Java / Scala to do whatever you need inside your function. On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Paweł, Thanks a lot for your answer. I finally got the program to work by using aggregateByKey, but I was wondering why creating thousands of RDDs doesn't work. I think that could be interesting for using methods that work on RDDs like for example JavaDoubleRDD.stats() ( http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29 ). If the groups are small then I can chain groupBy(), collect(), parallelize() and stats(), but that is quite inefficient because it implies moving data to and from the driver, and also doesn't scale to big groups; on the other hand if I use aggregateByKey or a similar function then I cannot use stats() so I have to reimplement it. In general I was looking for a way to reuse other functions that I have that work on RDDs, for using them on groups of data in a RDD, because I don't see a how to directly apply them to each of the groups in a pair RDD. Again, thanks a lot for your answer, Greetings, Juan Rodriguez 2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com: Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce
Re: Read parquet folders recursively
Hi. Thanks for your answers, but, to read parquet files is necessary to use parquetFile method in org.apache.spark.sql.SQLContext, is it true? How can I combine your solution with the called to this method? Thanks!! Regards On Thu, Mar 12, 2015 at 8:34 AM, Yijie Shen henry.yijies...@gmail.com wrote: org.apache.spark.deploy.SparkHadoopUtil has a method: /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. */ def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = { def recurse(path: Path) = { val (directories, leaves) = fs.listStatus(path).partition(_.isDir) leaves ++ directories.flatMap(f = listLeafStatuses(fs, f.getPath)) } val baseStatus = fs.getFileStatus(basePath) if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } — Best Regards! Yijie Shen On March 12, 2015 at 2:35:49 PM, Akhil Das (ak...@sigmoidanalytics.com) wrote: Hi We have a custom build to read directories recursively, Currently we use it with fileStream like: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/datadumps/, (t: Path) = true, true, *true*) Making the 4th argument true to read recursively. You could give it a try https://s3.amazonaws.com/sigmoidanalytics-builds/spark-1.2.0-bin-spark-1.2.0-hadoop2.4.0.tgz Thanks Best Regards On Wed, Mar 11, 2015 at 9:45 PM, Masf masfwo...@gmail.com wrote: Hi all Is it possible to read recursively folders to read parquet files? Thanks. -- Saludos. Miguel Ángel -- Saludos. Miguel Ángel
Re: hbase sql query
Have you considered using the spark-hbase-connector for this: https://github.com/nerdammer/spark-hbase-connector On Thu, Mar 12, 2015 at 5:19 AM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Thanks Akhil. Additionaly if we want to do sql query we need to create JavaPairRdd, then JavaRdd, then JavaSchemaRdd and then sqlContext.sql(sql query). Ryt ? *Thanks,* *Udbhav Agarwal* *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* 12 March, 2015 11:43 AM *To:* Udbhav Agarwal *Cc:* user@spark.apache.org *Subject:* Re: hbase sql query Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete example https://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q . Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. *Thanks,* *Udbhav Agarwal*
Error running rdd.first on hadoop
Hi I was running with spark-1.3.0-snapshot rdd = sc.textFile(hdfs://X.X.X.X/data) rdd.first() Then I got this error: Traceback (most recent call last): File stdin, line 1, in module File /pyspark/rdd.py, line 1243, in first rs = self.take(1) File /pyspark/rdd.py, line 1195, in take totalParts = self._jrdd.partitions().size() File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o28.partitions. : java.net.ConnectException: Call From etl1.gphd.local/X.X.X.X to etl1.gphd.local:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcce ssorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstru ctorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) at org.apache.hadoop.ipc.Client.call(Client.java:1351) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.ja va:206) at com.sun.proxy.$Proxy13.getDelegationToken(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 7) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp l.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocat ionHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHan dler.java:102) at com.sun.proxy.$Proxy13.getDelegationToken(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDel egationToken(ClientNamenodeProtocolTranslatorPB.java:805) at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:847) at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(Distributed FileSystem.java:1318) at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:526 ) at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:504) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInt ernal(TokenCache.java:121) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInt ernal(TokenCache.java:100) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(To kenCache.java:80) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:20 2) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270 ) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:56 ) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 7) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp l.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) Caused by: java.net.ConnectException:
spark sql writing in avro
Hi All, I am current trying to write out a scheme RDD to avro. I noticed that there is a databricks spark-avro library and I have included that in my dependencies, but it looks like I am not able to access the AvroSaver object. On compilation of the job I get this: error: not found: value AvroSaver [ERROR] AvroSaver.save(resultRDD, args(4)) I also tried calling saveAsAvro on the resultRDD(the actual rdd with the results) and that passes compilation, but when I run the code I get an error that says the saveAsAvro is not implemented. I am using version 0.1 of spark-avro_2.10 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Limit # of parallel parquet decompresses
My jobs frequently run out of memory if the #of cores on an executor is too high, because each core launches a new parquet decompressor thread, which allocates memory off heap to decompress. Consequently, even with say 12 cores on an executor, depending on the memory, I can only use 2-3 to avoid OOMs when reading parquet files. Ideally I would want to use all 12 cores, but limit the # of parquet decompresses to 2-3 per executor. Is there some way to do this? Thanks, Ankit -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-of-parallel-parquet-decompresses-tp22022.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
repartitionAndSortWithinPartitions and mapPartitions and sort order
I am using repartitionAndSortWithinPartitions to partition my content and then sort within each partition. I've also created a custom partitioner that I use with repartitionAndSortWithinPartitions. I created a custom partitioner as my key consist of something like 'groupid|timestamp' and I only want to partition on the group id but I want to sort the records on each partition using the entire key (groupid and the timestamp). My question is when I use mapPartitions (to process the records in each partition) is whether the order in each partition will be guaranteed (from the sort) as I iterate through the records in each partition. As I iterate, while processing the current record I need to look at the previous record and the next record in the partition and I need to make sure the records would be processed in the sorted order. I tend to think so, but wanted to confirm. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AWS SDK HttpClient version conflict (spark.files.userClassPathFirst not working)
Hi Adam, Could you try building spark with profile -Pkinesis-asl. mvn -Pkinesis-asl -DskipTests clean package refers to 'Running the Example' section. https://spark.apache.org/docs/latest/streaming-kinesis-integration.html In fact, I've seen same issue and have been able to use the AWS SDK by trying the above. I've also tried to use 'spark.files.userClassPathFirst' flag, and it doesn't work. Regards, Yuya Urano On 2015/03/13 3:50, Adam Lewandowski wrote: I'm trying to use the AWS SDK (v1.9.23) to connect to DynamoDB from within a Spark application. Spark 1.2.1 is assembled with HttpClient 4.2.6, but the AWS SDK is depending on HttpClient 4.3.4 for it's communication with DynamoDB. The end result is an error when the app tries to connect to DynamoDB and gets Spark's version instead: java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpPatch at com.amazonaws.http.AmazonHttpClient.clinit(AmazonHttpClient.java:129) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:120) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:359) Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpPatch Including HttpClient 4.3.4 as user jars doesn't improve the situation much: java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:95) I've seen the documenation regarding the 'spark.files.userClassPathFirst' flag and have tried to use it thinking it would resolve this issue. However, when that flag is used I get an NoClassDefFoundError on 'scala.Serializable': java.lang.NoClassDefFoundError: scala/Serializable ... at org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:46) ... Caused by: java.lang.ClassNotFoundException: scala.Serializable This seems odd to me, since scala.Serializable is included in the spark assembly. I thought perhaps my app was compiled against a different scala version than spark uses, but eliminated that possibility by using the scala compiler directly out of the spark assembly jar with identical results. Has anyone else seen this issue, had any success with the spark.files.userClassPathFirst flag, or been able to use the AWS SDK? I was going to submit this a Spark JIRA issue, but thought I would check here first. Thanks, Adam Lewandowski
Re: NegativeArraySizeException when doing joins on skewed data
Hi Tristan, Did upgrading to Kryo3 help? Thanks, Soila On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers tris...@blackfrog.org wrote: Yeah I implemented the same solution. It seems to kick in around the 4B mark, but looking at the log I suspect it’s probably a function of the number of unique objects more than anything. I definitely don’t have more than 2B unique objects. Will try the same test on Kryo3 and see if it goes away. T On 27 February 2015 at 06:21, Soila Pertet Kavulya skavu...@gmail.com wrote: Thanks Tristan, I ran into a similar issue with broadcast variables. I worked around it by estimating the size of the object I want to broadcast, splitting it up into chunks that were less than 2G, then doing multiple broadcasts. This approach worked pretty well for broadcast variables less than 10GB on our system. However, for larger variables the spills to disk made progress painfully slow so we need to do regular joins. Do you know if there are any efforts to get Kryo to support objects larger than a couple of GBs. Soila On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers tris...@blackfrog.org wrote: I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) ... 23 more On 26 February 2015 at 03:49, soila skavu...@gmail.com wrote: I have been running into NegativeArraySizeException's when doing joins on data with very skewed key distributions in Spark
Unable to stop Worker in standalone mode by sbin/stop-all.sh
Checking the script, it seems spark-daemon.sh unable to stop the worker $ ./spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1 no org.apache.spark.deploy.worker.Worker to stop $ ps -elf | grep spark 0 S taoewang 24922 1 0 80 0 - 733878 futex_ Mar12 ? 00:08:54 java -cp /data/sequoiadb-driver-1.10.jar,/data/spark-sequoiadb-0.0.1-SNAPSHOT.jar::/data/spark/conf:/data/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar -XX:MaxPermSize=128m -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos-151:2181,centos-152:2181,centos-153:2181 -Dspark.deploy.zookeeper.dir=/data/zookeeper -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://centos-151:7077,centos-152:7077,centos-153:7077 In spark-daemon script it tries to find $pid in /tmp/: pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid” In my case pid supposed to be: /tmp/spark-taoewang-org.apache.spark.deploy.worker.Worker-1.pid However when I go through the files in /tmp directory I don’t find such file exist. I got 777 on /tmp and also tried to touch a file with my current account and success, so it shouldn’t be permission issue. $ ls -la / | grep tmp drwxrwxrwx. 6 root root 4096 Mar 13 08:19 tmp Anyone has any idea why the pid file didn’t show up? Thanks TW - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to stop Worker in standalone mode by sbin/stop-all.sh
Does the machine have cron job that periodically cleans up /tmp dir ? Cheers On Thu, Mar 12, 2015 at 6:18 PM, sequoiadb mailing-list-r...@sequoiadb.com wrote: Checking the script, it seems spark-daemon.sh unable to stop the worker $ ./spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1 no org.apache.spark.deploy.worker.Worker to stop $ ps -elf | grep spark 0 S taoewang 24922 1 0 80 0 - 733878 futex_ Mar12 ? 00:08:54 java -cp /data/sequoiadb-driver-1.10.jar,/data/spark-sequoiadb-0.0.1-SNAPSHOT.jar::/data/spark/conf:/data/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar -XX:MaxPermSize=128m -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos-151:2181,centos-152:2181,centos-153:2181 -Dspark.deploy.zookeeper.dir=/data/zookeeper -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://centos-151:7077,centos-152:7077,centos-153:7077 In spark-daemon script it tries to find $pid in /tmp/: pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid” In my case pid supposed to be: /tmp/spark-taoewang-org.apache.spark.deploy.worker.Worker-1.pid However when I go through the files in /tmp directory I don’t find such file exist. I got 777 on /tmp and also tried to touch a file with my current account and success, so it shouldn’t be permission issue. $ ls -la / | grep tmp drwxrwxrwx. 6 root root 4096 Mar 13 08:19 tmp Anyone has any idea why the pid file didn’t show up? Thanks TW - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Support for skewed joins in Spark
Does Spark support skewed joins similar to Pig which distributes large keys over multiple partitions? I tried using the RangePartitioner but I am still experiencing failures because some keys are too large to fit in a single partition. I cannot use broadcast variables to work-around this because both RDDs are too large to fit in driver memory. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql writing in avro
Short answer: if you downloaded spark-avro from the repo.maven.apache.org repo you might be using an old version (pre-November 14, 2014) - see timestamps at http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/ Lots of changes at https://github.com/databricks/spark-avro since then. Databricks, thank you for sharing the Avro code!!! Could you please push out the latest version or update the version number and republish to repo.maven.apache.org (I have no idea how jars get there). Or is there a different repository that users should point to for this artifact? Workaround: Download from https://github.com/databricks/spark-avro and build with latest functionality (still version 0.1) and add to your local Maven or Ivy repo. Long version: I used a default Maven build and declared my dependency on: dependency groupIdcom.databricks/groupId artifactIdspark-avro_2.10/artifactId version0.1/version /dependency Maven downloaded the 0.1 version from http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/ and included it in my app code jar. From spark-shell: import com.databricks.spark.avro._ import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) # This schema includes LONG for time in millis (https://github.com/medale/spark-mail/blob/master/mailrecord/src/main/avro/com/uebercomputing/mailrecord/MailRecord.avdl) val recordsSchema = sqlContext.avroFile(/opt/rpm1/enron/enron-tiny.avro) java.lang.RuntimeException: Unsupported type LONG However, checking out the spark-avro code from its GitHub repo and adding a test case against the MailRecord avro everything ran fine. So I built the databricks spark-avro locally on my box and then put it in my local Maven repo - everything worked from spark-shell when adding that jar as dependency. Hope this helps for the save case as well. On the pre-14NOV version, avro.scala says: // TODO: Implement me. implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) { def saveAsAvroFile(path: String): Unit = ??? } Markus On 03/12/2015 07:05 PM, kpeng1 wrote: Hi All, I am current trying to write out a scheme RDD to avro. I noticed that there is a databricks spark-avro library and I have included that in my dependencies, but it looks like I am not able to access the AvroSaver object. On compilation of the job I get this: error: not found: value AvroSaver [ERROR] AvroSaver.save(resultRDD, args(4)) I also tried calling saveAsAvro on the resultRDD(the actual rdd with the results) and that passes compilation, but when I run the code I get an error that says the saveAsAvro is not implemented. I am using version 0.1 of spark-avro_2.10 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to stop Worker in standalone mode by sbin/stop-all.sh
Nope, I can see the master file exist but not the worker: $ ls bitrock_installer.log hsperfdata_root hsperfdata_taoewang omatmp sbt2435921113715137753.log spark-taoewang-org.apache.spark.deploy.master.Master-1.pid 在 2015年3月13日,上午9:34,Ted Yu yuzhih...@gmail.com 写道: Does the machine have cron job that periodically cleans up /tmp dir ? Cheers On Thu, Mar 12, 2015 at 6:18 PM, sequoiadb mailing-list-r...@sequoiadb.com mailto:mailing-list-r...@sequoiadb.com wrote: Checking the script, it seems spark-daemon.sh unable to stop the worker $ ./spark-daemon.sh stop org.apache.spark.deploy.worker.Worker 1 no org.apache.spark.deploy.worker.Worker to stop $ ps -elf | grep spark 0 S taoewang 24922 1 0 80 0 - 733878 futex_ Mar12 ? 00:08:54 java -cp /data/sequoiadb-driver-1.10.jar,/data/spark-sequoiadb-0.0.1-SNAPSHOT.jar::/data/spark/conf:/data/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar -XX:MaxPermSize=128m -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=centos-151:2181,centos-152:2181,centos-153:2181 -Dspark.deploy.zookeeper.dir=/data/zookeeper -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://centos-151:7077,centos-152:7077,centos-153:7077 In spark-daemon script it tries to find $pid in /tmp/: pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid” In my case pid supposed to be: /tmp/spark-taoewang-org.apache.spark.deploy.worker.Worker-1.pid However when I go through the files in /tmp directory I don’t find such file exist. I got 777 on /tmp and also tried to touch a file with my current account and success, so it shouldn’t be permission issue. $ ls -la / | grep tmp drwxrwxrwx. 6 root root 4096 Mar 13 08:19 tmp Anyone has any idea why the pid file didn’t show up? Thanks TW - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: sc.textFile() on windows cannot access UNC path
Sounds like the way of doing it. Could you try accessing a file from UNC Path with native Java nio code and make sure it is able access it with the URI file:10.196.119.230/folder1/abc.txt? Thanks Best Regards On Wed, Mar 11, 2015 at 7:45 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Thanks for the reference. Is the following procedure correct? 1.Copy of the Hadoop source code org.apache.hadoop.mapreduce.lib.input .TextInputFormat.java as my own class, e.g. UncTextInputFormat.java 2.Modify UncTextInputFormat.java to handle UNC path 3.Call sc.newAPIHadoopFile(…) with sc.newAPIHadoopFile[LongWritable, Text, UncTextInputFormat](“file: 10.196.119.230/folder1/abc.txt”, classOf[UncTextInputFormat], classOf[LongWritable], classOf[Text], conf) Ningjun *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Wednesday, March 11, 2015 2:40 AM *To:* Wang, Ningjun (LNG-NPV) *Cc:* java8964; user@spark.apache.org *Subject:* Re: sc.textFile() on windows cannot access UNC path I don't have a complete example for your usecase, but you can see a lot of codes showing how to use new APIHadoopFile from here https://github.com/search?q=sc.newAPIHadoopFiletype=Codeutf8=%E2%9C%93 Thanks Best Regards On Tue, Mar 10, 2015 at 7:37 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: This sounds like the right approach. Is there any sample code showing how to use sc.newAPIHadoopFile ? I am new to Spark and don’t know much about Hadoop. I just want to read a text file from UNC path into an RDD. Thanks *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Tuesday, March 10, 2015 9:14 AM *To:* java8964 *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org *Subject:* Re: sc.textFile() on windows cannot access UNC path You can create your own Input Reader (using java.nio.*) and pass it to the sc.newAPIHadoopFile while reading. Thanks Best Regards On Tue, Mar 10, 2015 at 6:28 PM, java8964 java8...@hotmail.com wrote: I think the work around is clear. Using JDK 7, and implement your own saveAsRemoteWinText() using java.nio.path. Yong -- From: ningjun.w...@lexisnexis.com To: java8...@hotmail.com; user@spark.apache.org Subject: RE: sc.textFile() on windows cannot access UNC path Date: Tue, 10 Mar 2015 03:02:37 + Hi Yong Thanks for the reply. Yes it works with local drive letter. But I really need to use UNC path because the path is input from at runtime. I cannot dynamically assign a drive letter to arbitrary UNC path at runtime. Is there any work around that I can use UNC path for sc.textFile(…)? Ningjun *From:* java8964 [mailto:java8...@hotmail.com] *Sent:* Monday, March 09, 2015 5:33 PM *To:* Wang, Ningjun (LNG-NPV); user@spark.apache.org *Subject:* RE: sc.textFile() on windows cannot access UNC path This is a Java problem, not really Spark. From this page: http://stackoverflow.com/questions/18520972/converting-java-file-url-to-file-path-platform-independent-including-u You can see that using Java.nio.* on JDK 7, it will fix this issue. But Path class in Hadoop will use java.io.*, instead of java.nio. You need to manually mount your windows remote share a local driver, like Z:, then it should work. Yong -- From: ningjun.w...@lexisnexis.com To: user@spark.apache.org Subject: sc.textFile() on windows cannot access UNC path Date: Mon, 9 Mar 2015 21:09:38 + I am running Spark on windows 2008 R2. I use sc.textFile() to load text file using UNC path, it does not work. *sc*.textFile(*rawfile:10.196.119.230/folder1/abc.txt*, 4).count() Input path does not exist: file:/10.196.119.230/folder1/abc.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/10.196.119.230/tar/Enron/enron-207-short.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
Re: Partitioning Dataset and Using Reduce in Apache Spark
Thank you very much for your detailed response, it was very informative and cleared up some of my misconceptions. After your explanation, I understand that the distribution of the data and parallelism is all meant to be an abstraction to the developer. In your response you say “When you call reduce and similar methods, each partition can be reduced in parallel. Then the results of that can be transferred across the network and reduced to the final result”. By similar methods do you mean all actions within spark? Does transfer of data from worker nodes to driver nodes happen only when an action is performed? I am assuming that in Spark, you typically have a set of transformations followed by some sort of action. The RDD is partitioned and sent to different worker nodes(assuming this a cluster setup), the transformations are applied to the RDD partitions at the various worker nodes, and then when an action is performed, you perform the action on the worker nodes and then aggregate the partial results at the driver and then perform another reduction at the driver to obtain the overall results. I would also assume that deciding whether the action should be done on a worker node, depends on the type of action. For example, performing reduce at the worker node makes sense, while it doesn't make sense to save the file at the worker node. Does that sound correct, or am I misinterpreting something? Thanks, Raghav From: Daniel Siegmann Sent: Thursday, March 5, 2015 2:01 PM To: Raghav Shankar Cc: user@spark.apache.org An RDD is a Resilient Distributed Data set. The partitioning and distribution of the data happens in the background. You'll occasionally need to concern yourself with it (especially to get good performance), but from an API perspective it's mostly invisible (some methods do allow you to specify a number of partitions). When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will be composed of a bunch of partitions, but you don't really need to worry about that. The partitioning will be based on how the data is stored. When you call a method that causes a shuffle (such as reduce), the data is repartitioned into a number of partitions based on your default parallelism setting (which IIRC is based on your number of cores if you haven't set it explicitly). When you call reduce and similar methods, each partition can be reduced in parallel. Then the results of that can be transferred across the network and reduced to the final result. You supply the function and Spark handles the parallel execution of that function. I hope this helps clear up your misconceptions. You might also want to familiarize yourself with the collections API in Java 8 (or Scala, or Python, or pretty much any other language with lambda expressions), since RDDs are meant to have an API that feels similar. On Thu, Mar 5, 2015 at 9:45 AM, raggy raghav0110...@gmail.com wrote: I am trying to use Apache spark to load up a file, and distribute the file to several nodes in my cluster and then aggregate the results and obtain them. I don't quite understand how to do this. From my understanding the reduce action enables Spark to combine the results from different nodes and aggregate them together. Am I understanding this correctly? From a programming perspective, I don't understand how I would code this reduce function. How exactly do I partition the main dataset into N pieces and ask them to be parallel processed by using a list of transformations? reduce is supposed to take in two elements and a function for combining them. Are these 2 elements supposed to be RDDs from the context of Spark or can they be any type of element? Also, if you have N different partitions running parallel, how would reduce aggregate all their results into one final result(since the reduce function aggregates only 2 elements)? Also, I don't understand this example. The example from the spark website uses reduce, but I don't see the data being processed in parallel. So, what is the point of the reduce? If I could get a detailed explanation of the loop in this example, I think that would clear up most of my questions. class ComputeGradient extends FunctionDataPoint, Vector { private Vector w; ComputeGradient(Vector w) { this.w = w; } public Vector call(DataPoint p) { return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1)); } } JavaRDDDataPoint points = spark.textFile(...).map(new ParsePoint()).cache(); Vector w = Vector.random(D); // current separating plane for (int i = 0; i ITERATIONS; i++) { Vector gradient = points.map(new ComputeGradient(w)).reduce(new AddVectors()); w = w.subtract(gradient); } System.out.println(Final separating plane: + w); Also, I have been trying to find the source code for reduce from the Apache Spark Github, but the source is pretty huge and I haven't been able to pinpoint it. Could someone please direct me towards
Re: hbase sql query
Like this? val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache() Here's a complete example https://www.mapr.com/developercentral/code/loading-hbase-tables-spark#.VQEtqFR515Q . Thanks Best Regards On Wed, Mar 11, 2015 at 4:46 PM, Udbhav Agarwal udbhav.agar...@syncoms.com wrote: Hi, How can we simply cache hbase table and do sql query via java api in spark. *Thanks,* *Udbhav Agarwal*
Re: bad symbolic reference. A signature in SparkContext.class refers to term conf in value org.apache.hadoop which is not available
Spark 1.3.0 is not officially out yet, so i don't think sbt will download the hadoop dependencies for your spark by itself. You could try manually adding the hadoop dependencies yourself (hadoop-core, hadoop-common, hadoop-client) Thanks Best Regards On Wed, Mar 11, 2015 at 9:07 PM, Patcharee Thongtra patcharee.thong...@uni.no wrote: Hi, I have built spark version 1.3 and tried to use this in my spark scala application. When I tried to compile and build the application by SBT, I got error bad symbolic reference. A signature in SparkContext.class refers to term conf in value org.apache.hadoop which is not available It seems hadoop library is missing, but it should be referred automatically by SBT, isn't it. This application is buit-able on spark version 1.2 Here is my build.sbt name := wind25t-v013 version := 0.1 scalaVersion := 2.10.4 unmanagedBase := baseDirectory.value / lib libraryDependencies += org.apache.spark %% spark-core % 1.3.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 libraryDependencies += org.apache.spark %% spark-sql % 1.3.0 libraryDependencies += org.apache.spark % spark-hive_2.10 % 1.3.0 What should I do to fix it? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StreamingListener
At the end of foreachrdd i believe. Thanks Best Regards On Thu, Mar 12, 2015 at 6:48 AM, Corey Nolet cjno...@gmail.com wrote: Given the following scenario: dstream.map(...).filter(...).window(...).foreachrdd() When would the onBatchCompleted fire?
Re: Read parquet folders recursively
Hi We have a custom build to read directories recursively, Currently we use it with fileStream like: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/datadumps/, (t: Path) = true, true, *true*) Making the 4th argument true to read recursively. You could give it a try https://s3.amazonaws.com/sigmoidanalytics-builds/spark-1.2.0-bin-spark-1.2.0-hadoop2.4.0.tgz Thanks Best Regards On Wed, Mar 11, 2015 at 9:45 PM, Masf masfwo...@gmail.com wrote: Hi all Is it possible to read recursively folders to read parquet files? Thanks. -- Saludos. Miguel Ángel
Re: Unable to read files In Yarn Mode of Spark Streaming ?
Are the files already present in HDFS before you are starting your application ? On Thu, Mar 12, 2015 at 11:11 AM, CH.KMVPRASAD [via Apache Spark User List] ml-node+s1001560n22008...@n3.nabble.com wrote: Hi am successfully executed sparkPi example on yarn mode but i cant able to read files from hdfs in my streaming application using java I tried 'textFileStream' and fileStream methods .. please help me ... for both methods am getting null ... please help me .. thanks for your help.. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-read-files-In-Yarn-Mode-of-Spark-Streaming-tp22008.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-read-files-In-Yarn-Mode-of-Spark-Streaming-tp22008p22010.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Define exception handling on lazy elements?
Handling exceptions this way means handling errors on the driver side, which may or may not be what you want. You can also write functions with exception handling inside, which could make more sense in some cases (like, to ignore bad records or count them or something). If you want to handle errors at every step on the driver side, you have to force RDDs to materialize to see if they work. You can do that with .count() or .take(1).length 0. But to avoid recomputing the RDD then, it needs to be cached. So there is a big non-trivial overhead to approaching it this way. If you go this way, consider materializing only a few key RDDs in your flow, not every one. The most natural thing is indeed to handle exceptions where the action occurs. On Wed, Mar 11, 2015 at 1:51 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark Community, We would like to define exception handling behavior on RDD instantiation / build. Since the RDD is lazily evaluated, it seems like we are forced to put all exception handling in the first action call? This is an example of something that would be nice: def myRDD = { Try { val rdd = sc.textFile(...) } match { Failure(e) = Handle ... } } myRDD.reduceByKey(...) //don't need to worry about that exception here The reason being that we want to try to avoid having to copy paste exception handling boilerplate on every first action. We would love to define this once somewhere for the RDD build code and just re-use. Is there a best practice for this? Are we missing something here? thanks, Michal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read parquet folders recursively
org.apache.spark.deploy.SparkHadoopUtil has a method: /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. */ def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = { def recurse(path: Path) = { val (directories, leaves) = fs.listStatus(path).partition(_.isDir) leaves ++ directories.flatMap(f = listLeafStatuses(fs, f.getPath)) } val baseStatus = fs.getFileStatus(basePath) if (baseStatus.isDir) recurse(basePath) else Array(baseStatus) } — Best Regards! Yijie Shen On March 12, 2015 at 2:35:49 PM, Akhil Das (ak...@sigmoidanalytics.com) wrote: Hi We have a custom build to read directories recursively, Currently we use it with fileStream like: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/datadumps/, (t: Path) = true, true, true) Making the 4th argument true to read recursively. You could give it a try https://s3.amazonaws.com/sigmoidanalytics-builds/spark-1.2.0-bin-spark-1.2.0-hadoop2.4.0.tgz Thanks Best Regards On Wed, Mar 11, 2015 at 9:45 PM, Masf masfwo...@gmail.com wrote: Hi all Is it possible to read recursively folders to read parquet files? Thanks. -- Saludos. Miguel Ángel