Are you using DSE spark, if so are you pointing spark job server to use DSE spark?
Thanks and Regards Noorul Anand <anand.vi...@monotype.com> writes: > *I am new to Spark world and Job Server > > My Code :* > > package spark.jobserver > > import java.nio.ByteBuffer > > import scala.collection.JavaConversions._ > import scala.collection.mutable.ListBuffer > import scala.collection.immutable.Map > > import org.apache.cassandra.hadoop.ConfigHelper > import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat > import org.apache.cassandra.hadoop.cql3.CqlConfigHelper > import org.apache.cassandra.hadoop.cql3.CqlOutputFormat > import org.apache.cassandra.utils.ByteBufferUtil > import org.apache.hadoop.mapreduce.Job > > import com.typesafe.config.{Config, ConfigFactory} > import org.apache.spark._ > import org.apache.spark.SparkContext._ > import scala.util.Try > > object CassandraCQLTest extends SparkJob{ > > def main(args: Array[String]) { > val sc = new SparkContext("local[4]", "CassandraCQLTest") > > sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar"); > val config = ConfigFactory.parseString("") > val results = runJob(sc, config) > println("Result is " + "test") > } > > override def validate(sc: SparkContext, config: Config): > SparkJobValidation = { > Try(config.getString("input.string")) > .map(x => SparkJobValid) > .getOrElse(SparkJobInvalid("No input.string config param")) > } > > override def runJob(sc: SparkContext, config: Config): Any = { > val cHost: String = "localhost" > val cPort: String = "9160" > val KeySpace = "retail" > val InputColumnFamily = "ordercf" > val OutputColumnFamily = "salecount" > > val job = new Job() > job.setInputFormatClass(classOf[CqlPagingInputFormat]) > ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) > ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) > ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, > InputColumnFamily) > ConfigHelper.setInputPartitioner(job.getConfiguration(), > "Murmur3Partitioner") > CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") > > /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), > "user_id='bob'") */ > > /** An UPDATE writes one or more columns to a record in a Cassandra > column family */ > val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET > sale_count = ? " > CqlConfigHelper.setOutputCql(job.getConfiguration(), query) > > job.setOutputFormatClass(classOf[CqlOutputFormat]) > ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, > OutputColumnFamily) > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) > ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) > ConfigHelper.setOutputPartitioner(job.getConfiguration(), > "Murmur3Partitioner") > > val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), > classOf[CqlPagingInputFormat], > classOf[java.util.Map[String,ByteBuffer]], > classOf[java.util.Map[String,ByteBuffer]]) > > > val productSaleRDD = casRdd.map { > case (key, value) => { > (ByteBufferUtil.string(value.get("prod_id")), > ByteBufferUtil.toInt(value.get("quantity"))) > } > } > val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) > aggregatedRDD.collect().foreach { > case (productId, saleCount) => println(productId + ":" + saleCount) > } > > val casoutputCF = aggregatedRDD.map { > case (productId, saleCount) => { > val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) > val outKey: java.util.Map[String, ByteBuffer] = outColFamKey > var outColFamVal = new ListBuffer[ByteBuffer] > outColFamVal += ByteBufferUtil.bytes(saleCount) > val outVal: java.util.List[ByteBuffer] = outColFamVal > (outKey, outVal) > } > } > > casoutputCF.saveAsNewAPIHadoopFile( > KeySpace, > classOf[java.util.Map[String, ByteBuffer]], > classOf[java.util.List[ByteBuffer]], > classOf[CqlOutputFormat], > job.getConfiguration() > ) > casRdd.count > } > } > > *When I push the Jar using spark-jobServer and execute it I get this on > spark-jobserver terminal > * > job-server[ERROR] Exception in thread "pool-1-thread-1" > java.lang.NoClassDefFoundError: > org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat > job-server[ERROR] at > spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) > job-server[ERROR] at > spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) > job-server[ERROR] at > spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) > job-server[ERROR] at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > job-server[ERROR] at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > job-server[ERROR] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > job-server[ERROR] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > job-server[ERROR] at java.lang.Thread.run(Thread.java:745) > job-server[ERROR] Caused by: java.lang.ClassNotFoundException: > org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat > job-server[ERROR] at > java.net.URLClassLoader$1.run(URLClassLoader.java:366) > job-server[ERROR] at > java.net.URLClassLoader$1.run(URLClassLoader.java:355) > job-server[ERROR] at java.security.AccessController.doPrivileged(Native > Method) > job-server[ERROR] at > java.net.URLClassLoader.findClass(URLClassLoader.java:354) > job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > job-server[ERROR] ... 8 more > > *I have already added the $EXTRA_JAR variable to my > cassandra-spark-connector-assembly. > > Regards, > Anand* > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org