Re: Spark Cassandra Connector issue
HI All, I have tried Commands as mentioned below but still it is errors dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar,/home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I understand only problem with the way I provide list of jar file in the command, if anybody using Datastax Enterprise could please provide thier inputs to get this issue resolved Thanks for your support Satish Chandra On Mon, Aug 10, 2015 at 7:16 PM, Dean Wampler deanwamp...@gmail.com wrote: I don't know if DSE changed spark-submit, but you have to use a comma-separated list of jars to --jars. It probably looked for HelloWorld in the second one, the dse.jar file. Do this: dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I also removed the extra //. Or put file: in front of them so they are proper URLs. Note the snapshot jar isn't in the --jars list. I assume that's where HelloWorld is found. Confusing, yes it is... dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com wrote: Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606
Spark Cassandra Connector issue
HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set(spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt( alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM XML:* dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.2/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactId*hadoop*-client/artifactId version1.2.1/version /dependency dependency groupIdorg.scala-*lang*/groupId artifactId*scala*-library/artifactId version2.10.5/version /dependency dependency groupId*junit*/groupId artifactId*junit*/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdcom.datastax.dse/groupId artifactId*dse*/artifactId version4.7.2/version scopesystem/scope systemPathC:\workspace\*etl*\*lib*\dse.jar/ systemPath /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-*cassandra*-connector-java_2.10/artifactId version1.1.1/version /dependency /dependencies Please let me know if any further details required to analyze the issue Regards, Satish Chandra
Re: Spark Cassandra Connector issue
Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set( spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt(alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM XML:* dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.2/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactId*hadoop*-client/artifactId version1.2.1/version /dependency dependency groupIdorg.scala-*lang*/groupId artifactId*scala*-library/artifactId version2.10.5/version /dependency dependency groupId*junit*/groupId artifactId*junit*/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdcom.datastax.dse/groupId artifactId*dse*/artifactId version4.7.2/version scopesystem/scope systemPathC:\workspace\*etl*\*lib*\dse.jar/ systemPath /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-*cassandra*-connector-java_2.10/ artifactId version1.1.1/version /dependency /dependencies Please let me know if any further details required to analyze the issue Regards, Satish Chandra
Re: Spark Cassandra Connector issue
I don't know if DSE changed spark-submit, but you have to use a comma-separated list of jars to --jars. It probably looked for HelloWorld in the second one, the dse.jar file. Do this: dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/ postgresql-9.4-1201.jdbc41.jar,/home/missingmerch/dse.jar,/home/missingmerch/spark- cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0. 1-SNAPSHOT.jar I also removed the extra //. Or put file: in front of them so they are proper URLs. Note the snapshot jar isn't in the --jars list. I assume that's where HelloWorld is found. Confusing, yes it is... dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 8:23 AM, satish chandra j jsatishchan...@gmail.com wrote: Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set
Re: Spark Cassandra Connector issue
Hi, Thanks for quick input, now I am getting class not found error *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/dse.jar ///home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* java.lang.ClassNotFoundException: HelloWorld at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:342) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Previously I could fix the issue by changing the order of arguments passing in DSE command line interface but now I am not sure why the issue again Please let me know if still I am missing anything in my Command as mentioned above(as insisted I have added dse.jar and spark-cassandra-connector-java_2.10.1.1.1.jar) Thanks for support Satish Chandra On Mon, Aug 10, 2015 at 6:19 PM, Dean Wampler deanwamp...@gmail.com wrote: Add the other Cassandra dependencies (dse.jar, spark-cassandra-connect-java_2.10) to your --jars argument on the command line. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Aug 10, 2015 at 7:44 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please help me to fix Spark Cassandra Connector issue, find the details below *Command:* dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar *Error:* WARN 2015-08-10 06:33:35 org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions; at HelloWorld$.main(HelloWorld.scala:29) at HelloWorld.main(HelloWorld.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Code:* *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.SparkContext._ *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.rdd.JdbcRDD *import* *com.datastax*.spark.connector._ *import* com.datastax.spark.connector.cql.CassandraConnector *import* com.datastax.bdp.spark.DseSparkConfHelper._ *import* java.sql.{Connection, DriverManager, ResultSet, PreparedStatement, SQLException, Statement} *object* HelloWorld { *def* main(args: Array[String]) { *def* createSparkContext() = { *val** myJar = *getClass.getProtectionDomain.getCodeSource.getLocation.getPath *val* conf = *new* SparkConf().set( spark.cassandra.connection.host, 10.246.43.15) .setAppName(First Spark App) .setMaster(local) * .s*etJars(Array(myJar)) .set(cassandra.username, username) .set(cassandra.password, password) .forDse *new* SparkContext(conf) } *val* sc = createSparkContext() *val* user=hkonak0 *val** pass=*Winter18 Class.forName(org.postgresql.Driver).newInstance *val* url = jdbc:postgresql://gptester:5432/db_test *val* myRDD27 = *new* JdbcRDD( sc, ()= DriverManager.getConnection(url,user,pass),select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?,5,0,1,(r: ResultSet) = {(r.getInt(alarm_type_code),r.getString(language_code),r.getString( alrm_type_cd_desc))}) myRDD27.saveToCassandra(keyspace,arm_typ_txt,SomeColumns( alarm_type_code,language_code,alrm_type_cd_desc)) println(myRDD27.count()) println(myRDD27.first) sc.stop() sys.exit() } } *POM
Spark Cassandra connector issue
Hi, I am creating a cassandra java rdd and transforming it using the where clause. It works fine when I run it outside the mapValues, but when I put the code in mapValues I get an error while creating the transformation. Below is my sample code: CassandraJavaRDDReferenceData cassandraRefTable = javaFunctions(sc ).cassandraTable(reference_data, dept_reference_data, ReferenceData.class); JavaPairRDDString, Employee joinedRdd = rdd.mapValues(new FunctionIPLocation, IPLocation() { public Employee call(Employee employee) throws Exception { ReferenceData data = null; if(employee.getDepartment() != null) { data = referenceTable.where(postal_plus=?, location .getPostalPlus()).first(); System.out.println(data.toCSV()); } if(data != null) { //call setters on employee } return employee; } } I get this error: java.lang.NullPointerException at org.apache.spark.rdd.RDD.init(RDD.scala:125) at com.datastax.spark.connector.rdd.CassandraRDD.init( CassandraRDD.scala:47) at com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:70) at com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:77 ) at com.datastax.spark.connector.rdd.CassandraJavaRDD.where( CassandraJavaRDD.java:54) Thanks for help!! Regards Ankur
Re: Spark Cassandra connector issue
Is this because I am calling a transformation function on an rdd from inside another transformation function? Is it not allowed? Thanks Ankut On Oct 21, 2014 1:59 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi Gerard, this is the code that may be helpful. public class ReferenceDataJoin implements Serializable { private static final long serialVersionUID = 1039084794799135747L; JavaPairRDDString, Employee rdd; CassandraJavaRDDReferenceData referenceTable; public PostalReferenceDataJoin(ListEmployee employees) { JavaSparkContext sc = SparkContextFactory.getSparkContextFactory().getSparkContext(); this.rdd = sc.parallelizePairs(employees); this. referenceTable = javaFunctions(sc).cassandraTable(reference_data, “dept_reference_data, ReferenceData.class); } public JavaPairRDDString, Employee execute() { JavaPairRDDString, Employee joinedRdd = rdd .mapValues(new FunctionEmployee, Employee() { private static final long serialVersionUID = -226016490083377260L; @Override public Employee call(Employee employee) throws Exception { ReferenceData data = null; if (employee.getDepartment() != null) { data = referenceTable.where(“dept=?, employee.getDepartment()).first();; System.out.println(employee.getDepartment() + + data); } if (data != null) { //setters on employee } return employee; } }); return joinedRdd; } } Thanks Ankur On Tue, Oct 21, 2014 at 11:11 AM, Gerard Maas gerard.m...@gmail.com wrote: Looks like that code does not correspond to the problem you're facing. I doubt it would even compile. Could you post the actual code? -kr, Gerard On Oct 21, 2014 7:27 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi, I am creating a cassandra java rdd and transforming it using the where clause. It works fine when I run it outside the mapValues, but when I put the code in mapValues I get an error while creating the transformation. Below is my sample code: CassandraJavaRDDReferenceData cassandraRefTable = javaFunctions(sc ).cassandraTable(reference_data, dept_reference_data, ReferenceData.class); JavaPairRDDString, Employee joinedRdd = rdd.mapValues(new FunctionIPLocation, IPLocation() { public Employee call(Employee employee) throws Exception { ReferenceData data = null; if(employee.getDepartment() != null) { data = referenceTable.where(postal_plus=?, location .getPostalPlus()).first(); System.out.println(data.toCSV()); } if(data != null) { //call setters on employee } return employee; } } I get this error: java.lang.NullPointerException at org.apache.spark.rdd.RDD.init(RDD.scala:125) at com.datastax.spark.connector.rdd.CassandraRDD.init( CassandraRDD.scala:47) at com.datastax.spark.connector.rdd.CassandraRDD.copy( CassandraRDD.scala:70) at com.datastax.spark.connector.rdd.CassandraRDD.where( CassandraRDD.scala:77) at com.datastax.spark.connector.rdd.CassandraJavaRDD.where( CassandraJavaRDD.java:54) Thanks for help!! Regards Ankur
Spark Cassandra Connector Issue and performance
Hey all I tried spark connector with Cassandra and I ran into a problem that I was blocked on for couple of weeks. I managed to find a solution to the problem but I am not sure whether it was a bug of the connector/spark or not. I had three tables in Cassandra (Running Cassandra on 5 node cluster) and a large Spark cluster (5 worker node with each having 32 cores and 240G Memory). When I ran my job which extracts data from S3 and writes to 3 tables in Cassandra using around 1TB of memory and 160 cores, sometimes my job get stuck at last few task of a stage... After playing around for a while I realised that reducing number of cores to 2 per machine (10 Total) made the job stable. I gradually increased the number of cores and it hanged again once I had about 50 cores total. I would like to know if anyone else experienced this and if this is explainable? On another note I would like to know if people seeing good performance reading from cassandra using spark as oppose to reading data from HDFS. Kind of an open question but would like to see how others are using it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-Connector-Issue-and-performance-tp15005.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-cassandra-connector issue
Hello I'm trying to modify Spark sample app to integrate with Cassandra, however I saw exception when submitting the app. Anyone knows why it happens? Exception in thread main java.lang.NoClassDefFoundError: com/datastax/spark/connector/rdd/reader/RowReaderFactory at SimpleApp.main(SimpleApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.reader.RowReaderFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 8 more Source codes: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf(true) .set(spark.cassandra.connection.host, 10.20.132.44) .setAppName(Simple Application) val logFile = /home/gzhao/spark/spark-1.0.2-bin-hadoop1/README.md // Should be some file on your system val sc = new SparkContext(spark://mcs-spark-slave1-staging:7077, idfa_map, conf) val rdd = sc.cassandraTable(idfa_map, bcookie_idfa) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } }