Hi again,

Not sure if this is the right thing to do, but I pulled down the latest
spark-cassandra-connector, built the jars, and added --jars flag with
spark-submit (and added to all workers).

It looks like it moves a little further, but now I have the following error
(with all file content the same):

....
14/10/28 04:43:44 INFO AppClient$ClientActor: Connecting to master
spark://ip-172-31-38-112:7077...
14/10/28 04:43:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is
ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
Exception in thread "main" java.lang.NoSuchMethodError:
com.datastax.spark.connector.streaming.DStreamFunctions.saveToCassandra(Ljava/lang/String;Ljava/lang/String;Lcom/datastax/spark/connector/SomeColumns;Lcom/datastax/spark/connector/writer/RowWriterFactory;)V
at
org.apache.spark.examples.streaming.CassandraWordCount$.main(CassandraWordCount.scala:52)
at
org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

On Mon, Oct 27, 2014 at 9:22 PM, Harold Nguyen <har...@nexgate.com> wrote:

> Hi Spark friends,
>
> I'm trying to connect Spark Streaming into Cassandra by modifying the
> NetworkWordCount.scala streaming example, and doing the "make as few
> changes as possible" but having it insert data into Cassandra.
>
> Could you let me know if you see any errors?
>
> I'm using the spark-cassandra-connector, and I receive this error when I
> submit my spark jar:
>
> ======
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> com/datastax/spark/connector/mapper/ColumnMapper
> at
> org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> com.datastax.spark.connector.mapper.ColumnMapper
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 8 more
>
>
> =======
>
> And here is the class I'm using:
>
> package org.apache.spark.examples.streaming
>
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.storage.StorageLevel
>
> import com.datastax.spark.connector._
> import com.datastax.spark.connector.streaming._
>
> object CassandraWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 2) {
>       System.err.println("Usage: NetworkWordCount <hostname> <port>")
>       System.exit(1)
>     }
>
>     val conf = new SparkConf(true).set("spark.cassandra.connection.host", "
> ec2-54-191-235-127.us-west-2.compute.amazonaws.com
> ").setAppName("CassandraWordCount")
>     val sc = new SparkContext("spark://ip-172-31-38-112:7077", "test",
> conf)
>
>     StreamingExamples.setStreamingLogLevels()
>
>     // Create the context with a 1 second batch size
>     val ssc = new StreamingContext(sc, Seconds(1))
>
>     // Create a socket stream on target ip:port and count the
>     // words in input stream of \n delimited text (eg. generated by 'nc')
>     // Note that no duplication in storage level only for running locally.
>     // Replication necessary in distributed scenario for fault tolerance.
>     val lines = ssc.socketTextStream(args(0), args(1).toInt,
> StorageLevel.MEMORY_AND_DISK_SER)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>
>     //-- Writing it to Cassandra
>     wordCounts.saveToCassandra("test", "kv", SomeColumns("key", "value"))
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> =======
>
> Finally, here is my sbt build file:
>
> ======
>
> name := "Simple Streaming"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.0",
>   "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha3"
> withSources() withJavadoc(),
>   "org.apache.spark" %% "spark-sql" % "1.1.0"
> )
>
>
> =====
>
> Any help would be appreciated! Thanks so much!
>
> Harold
>

Reply via email to