Spark Streaming into Cassandra - NoClass ColumnMapper

2014-10-27 Thread Harold Nguyen
Hi Spark friends,

I'm trying to connect Spark Streaming into Cassandra by modifying the
NetworkWordCount.scala streaming example, and doing the make as few
changes as possible but having it insert data into Cassandra.

Could you let me know if you see any errors?

I'm using the spark-cassandra-connector, and I receive this error when I
submit my spark jar:

==

Exception in thread main java.lang.NoClassDefFoundError:
com/datastax/spark/connector/mapper/ColumnMapper
at
org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
com.datastax.spark.connector.mapper.ColumnMapper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 8 more


===

And here is the class I'm using:

package org.apache.spark.examples.streaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._

object CassandraWordCount {
  def main(args: Array[String]) {
if (args.length  2) {
  System.err.println(Usage: NetworkWordCount hostname port)
  System.exit(1)
}

val conf = new SparkConf(true).set(spark.cassandra.connection.host, 
ec2-54-191-235-127.us-west-2.compute.amazonaws.com
).setAppName(CassandraWordCount)
val sc = new SparkContext(spark://ip-172-31-38-112:7077, test, conf)

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split( ))
val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)

//-- Writing it to Cassandra
wordCounts.saveToCassandra(test, kv, SomeColumns(key, value))

ssc.start()
ssc.awaitTermination()
  }
}

===

Finally, here is my sbt build file:

==

name := Simple Streaming

version := 1.0

scalaVersion := 2.10.4

libraryDependencies ++= Seq(
  org.apache.spark % spark-streaming_2.10 % 1.1.0,
  com.datastax.spark %% spark-cassandra-connector % 1.1.0-alpha3
withSources() withJavadoc(),
  org.apache.spark %% spark-sql % 1.1.0
)


=

Any help would be appreciated! Thanks so much!

Harold


Re: Spark Streaming into Cassandra - NoClass ColumnMapper

2014-10-27 Thread Harold Nguyen
Hi again,

Not sure if this is the right thing to do, but I pulled down the latest
spark-cassandra-connector, built the jars, and added --jars flag with
spark-submit (and added to all workers).

It looks like it moves a little further, but now I have the following error
(with all file content the same):


14/10/28 04:43:44 INFO AppClient$ClientActor: Connecting to master
spark://ip-172-31-38-112:7077...
14/10/28 04:43:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is
ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
Exception in thread main java.lang.NoSuchMethodError:
com.datastax.spark.connector.streaming.DStreamFunctions.saveToCassandra(Ljava/lang/String;Ljava/lang/String;Lcom/datastax/spark/connector/SomeColumns;Lcom/datastax/spark/connector/writer/RowWriterFactory;)V
at
org.apache.spark.examples.streaming.CassandraWordCount$.main(CassandraWordCount.scala:52)
at
org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

On Mon, Oct 27, 2014 at 9:22 PM, Harold Nguyen har...@nexgate.com wrote:

 Hi Spark friends,

 I'm trying to connect Spark Streaming into Cassandra by modifying the
 NetworkWordCount.scala streaming example, and doing the make as few
 changes as possible but having it insert data into Cassandra.

 Could you let me know if you see any errors?

 I'm using the spark-cassandra-connector, and I receive this error when I
 submit my spark jar:

 ==

 Exception in thread main java.lang.NoClassDefFoundError:
 com/datastax/spark/connector/mapper/ColumnMapper
 at
 org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 com.datastax.spark.connector.mapper.ColumnMapper
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 8 more


 ===

 And here is the class I'm using:

 package org.apache.spark.examples.streaming

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel

 import com.datastax.spark.connector._
 import com.datastax.spark.connector.streaming._

 object CassandraWordCount {
   def main(args: Array[String]) {
 if (args.length  2) {
   System.err.println(Usage: NetworkWordCount hostname port)
   System.exit(1)
 }

 val conf = new SparkConf(true).set(spark.cassandra.connection.host, 
 ec2-54-191-235-127.us-west-2.compute.amazonaws.com
 ).setAppName(CassandraWordCount)
 val sc = new SparkContext(spark://ip-172-31-38-112:7077, test,
 conf)

 StreamingExamples.setStreamingLogLevels()

 // Create the context with a 1 second batch size
 val ssc = new StreamingContext(sc, Seconds(1))

 // Create a socket stream on target ip:port and count the
 // words in input stream of \n delimited text (eg. generated by 'nc')
 // Note that no duplication in storage level only for running locally.
 // Replication necessary in distributed scenario for fault tolerance.
 val lines = ssc.socketTextStream(args(0), args(1).toInt,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.flatMap(_.split( ))
 val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)

 //-- Writing it to Cassandra
 wordCounts.saveToCassandra(test, kv, SomeColumns(key, value))

 ssc.start()
 ssc.awaitTermination()
   }
 }

 ===

 Finally, here is my sbt build file:

 ==

 name := Simple Streaming

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies ++= Seq(
   org.apache.spark % spark-streaming_2.10 % 1.1.0,
   com.datastax.spark %% spark-cassandra-connector % 1.1.0-alpha3