Hi again, Not sure if this is the right thing to do, but I pulled down the latest spark-cassandra-connector, built the jars, and added --jars flag with spark-submit (and added to all workers).
It looks like it moves a little further, but now I have the following error (with all file content the same): .... 14/10/28 04:43:44 INFO AppClient$ClientActor: Connecting to master spark://ip-172-31-38-112:7077... 14/10/28 04:43:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 Exception in thread "main" java.lang.NoSuchMethodError: com.datastax.spark.connector.streaming.DStreamFunctions.saveToCassandra(Ljava/lang/String;Ljava/lang/String;Lcom/datastax/spark/connector/SomeColumns;Lcom/datastax/spark/connector/writer/RowWriterFactory;)V at org.apache.spark.examples.streaming.CassandraWordCount$.main(CassandraWordCount.scala:52) at org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Oct 27, 2014 at 9:22 PM, Harold Nguyen <har...@nexgate.com> wrote: > Hi Spark friends, > > I'm trying to connect Spark Streaming into Cassandra by modifying the > NetworkWordCount.scala streaming example, and doing the "make as few > changes as possible" but having it insert data into Cassandra. > > Could you let me know if you see any errors? > > I'm using the spark-cassandra-connector, and I receive this error when I > submit my spark jar: > > ====== > > Exception in thread "main" java.lang.NoClassDefFoundError: > com/datastax/spark/connector/mapper/ColumnMapper > at > org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.lang.ClassNotFoundException: > com.datastax.spark.connector.mapper.ColumnMapper > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 8 more > > > ======= > > And here is the class I'm using: > > package org.apache.spark.examples.streaming > > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.storage.StorageLevel > > import com.datastax.spark.connector._ > import com.datastax.spark.connector.streaming._ > > object CassandraWordCount { > def main(args: Array[String]) { > if (args.length < 2) { > System.err.println("Usage: NetworkWordCount <hostname> <port>") > System.exit(1) > } > > val conf = new SparkConf(true).set("spark.cassandra.connection.host", " > ec2-54-191-235-127.us-west-2.compute.amazonaws.com > ").setAppName("CassandraWordCount") > val sc = new SparkContext("spark://ip-172-31-38-112:7077", "test", > conf) > > StreamingExamples.setStreamingLogLevels() > > // Create the context with a 1 second batch size > val ssc = new StreamingContext(sc, Seconds(1)) > > // Create a socket stream on target ip:port and count the > // words in input stream of \n delimited text (eg. generated by 'nc') > // Note that no duplication in storage level only for running locally. > // Replication necessary in distributed scenario for fault tolerance. > val lines = ssc.socketTextStream(args(0), args(1).toInt, > StorageLevel.MEMORY_AND_DISK_SER) > val words = lines.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) > > //-- Writing it to Cassandra > wordCounts.saveToCassandra("test", "kv", SomeColumns("key", "value")) > > ssc.start() > ssc.awaitTermination() > } > } > > ======= > > Finally, here is my sbt build file: > > ====== > > name := "Simple Streaming" > > version := "1.0" > > scalaVersion := "2.10.4" > > libraryDependencies ++= Seq( > "org.apache.spark" % "spark-streaming_2.10" % "1.1.0", > "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha3" > withSources() withJavadoc(), > "org.apache.spark" %% "spark-sql" % "1.1.0" > ) > > > ===== > > Any help would be appreciated! Thanks so much! > > Harold >