Are you using DSE spark, if so are you pointing spark job server to use DSE 
spark?

Thanks and Regards
Noorul

Anand <anand.vi...@monotype.com> writes:

> *I am new to Spark world and Job Server
>
> My Code :*
>
> package spark.jobserver
>
> import java.nio.ByteBuffer
>
> import scala.collection.JavaConversions._
> import scala.collection.mutable.ListBuffer
> import scala.collection.immutable.Map
>
> import org.apache.cassandra.hadoop.ConfigHelper
> import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
> import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
> import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
> import org.apache.cassandra.utils.ByteBufferUtil
> import org.apache.hadoop.mapreduce.Job
>
> import com.typesafe.config.{Config, ConfigFactory}
> import org.apache.spark._
> import org.apache.spark.SparkContext._
> import scala.util.Try
>
> object CassandraCQLTest extends SparkJob{
>
>   def main(args: Array[String]) {   
>     val sc = new SparkContext("local[4]", "CassandraCQLTest")
>    
> sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar");
>     val config = ConfigFactory.parseString("")
>     val results = runJob(sc, config)
>     println("Result is " + "test")
>   }
>   
>   override def validate(sc: SparkContext, config: Config):
> SparkJobValidation = {
>     Try(config.getString("input.string"))
>       .map(x => SparkJobValid)
>       .getOrElse(SparkJobInvalid("No input.string config param"))
>   }
>   
>   override def runJob(sc: SparkContext, config: Config): Any = {
>     val cHost: String = "localhost"
>     val cPort: String = "9160"
>     val KeySpace = "retail"
>     val InputColumnFamily = "ordercf"
>     val OutputColumnFamily = "salecount"
>
>     val job = new Job()
>     job.setInputFormatClass(classOf[CqlPagingInputFormat])
>     ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
>     ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
>     ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace,
> InputColumnFamily)
>     ConfigHelper.setInputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner")
>     CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
>
>     /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),
> "user_id='bob'") */
>
>     /** An UPDATE writes one or more columns to a record in a Cassandra
> column family */
>     val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET
> sale_count = ? "
>     CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
>
>     job.setOutputFormatClass(classOf[CqlOutputFormat])
>     ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace,
> OutputColumnFamily)
>     ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
>     ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner")
>
>     val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
>       classOf[CqlPagingInputFormat],
>       classOf[java.util.Map[String,ByteBuffer]],
>       classOf[java.util.Map[String,ByteBuffer]])
>
>     
>     val productSaleRDD = casRdd.map {
>       case (key, value) => {
>         (ByteBufferUtil.string(value.get("prod_id")),
> ByteBufferUtil.toInt(value.get("quantity")))
>       }
>     }
>     val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
>     aggregatedRDD.collect().foreach {
>       case (productId, saleCount) => println(productId + ":" + saleCount)
>     }
>
>     val casoutputCF  = aggregatedRDD.map {
>       case (productId, saleCount) => {
>         val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
>         val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
>         var outColFamVal = new ListBuffer[ByteBuffer]
>         outColFamVal += ByteBufferUtil.bytes(saleCount)
>         val outVal: java.util.List[ByteBuffer] = outColFamVal
>        (outKey, outVal)
>       }
>     }
>
>     casoutputCF.saveAsNewAPIHadoopFile(
>         KeySpace,
>         classOf[java.util.Map[String, ByteBuffer]],
>         classOf[java.util.List[ByteBuffer]],
>         classOf[CqlOutputFormat],
>         job.getConfiguration()
>       )
>     casRdd.count
>   }
> }
>
> *When I push the Jar using spark-jobServer and execute it I get this on
> spark-jobserver terminal
> *
> job-server[ERROR] Exception in thread "pool-1-thread-1"
> java.lang.NoClassDefFoundError:
> org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
> job-server[ERROR]     at
> spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
> job-server[ERROR]     at
> spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
> job-server[ERROR]     at
> spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
> job-server[ERROR]     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> job-server[ERROR]     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> job-server[ERROR]     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> job-server[ERROR]     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> job-server[ERROR]     at java.lang.Thread.run(Thread.java:745)
> job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
> org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
> job-server[ERROR]     at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> job-server[ERROR]     at 
> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> job-server[ERROR]     at java.security.AccessController.doPrivileged(Native
> Method)
> job-server[ERROR]     at
> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> job-server[ERROR]     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> job-server[ERROR]     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> job-server[ERROR]     ... 8 more
>
> *I have already added the $EXTRA_JAR variable to my
> cassandra-spark-connector-assembly.
>
> Regards,
> Anand*
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to