Hi Spark friends,

I'm trying to connect Spark Streaming into Cassandra by modifying the
NetworkWordCount.scala streaming example, and doing the "make as few
changes as possible" but having it insert data into Cassandra.

Could you let me know if you see any errors?

I'm using the spark-cassandra-connector, and I receive this error when I
submit my spark jar:

======

Exception in thread "main" java.lang.NoClassDefFoundError:
com/datastax/spark/connector/mapper/ColumnMapper
at
org.apache.spark.examples.streaming.CassandraWordCount.main(CassandraWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
com.datastax.spark.connector.mapper.ColumnMapper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 8 more


=======

And here is the class I'm using:

package org.apache.spark.examples.streaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._

object CassandraWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "
ec2-54-191-235-127.us-west-2.compute.amazonaws.com
").setAppName("CassandraWordCount")
    val sc = new SparkContext("spark://ip-172-31-38-112:7077", "test", conf)

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sc, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    //-- Writing it to Cassandra
    wordCounts.saveToCassandra("test", "kv", SomeColumns("key", "value"))

    ssc.start()
    ssc.awaitTermination()
  }
}

=======

Finally, here is my sbt build file:

======

name := "Simple Streaming"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming_2.10" % "1.1.0",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha3"
withSources() withJavadoc(),
  "org.apache.spark" %% "spark-sql" % "1.1.0"
)


=====

Any help would be appreciated! Thanks so much!

Harold

Reply via email to