Hi,

I am intending to save the streaming data from kafka into Cassandra, using 
spark-streaming:
But there seems to be problem with line
javaFunctions(data).writerBuilder("testkeyspace", "test_table", 
mapToRow(TestTable.class)).saveToCassandra();
I am getting NoSuchMethodError.
The code, the error-log and POM.xml dependencies are listed below:
Please help me find the reason as to why is this happening.


public class SparkStream {
        static int key=0;
        public static void main(String args[]) throws Exception
        {
                  if(args.length != 3)
                  {
                        System.out.println("SparkStream <zookeeper_ip> 
<group_nm> <topic1,topic2,...>");
                        System.exit(1);
                  }

                  Logger.getLogger("org").setLevel(Level.OFF);
                  Logger.getLogger("akka").setLevel(Level.OFF);
                  Map<String,Integer> topicMap = new HashMap<String,Integer>();
                  String[] topic = args[2].split(",");
                  for(String t: topic)
                  {
                        topicMap.put(t, new Integer(3));
                  }

                  /* Connection to Spark */
                  SparkConf conf = new SparkConf();
                  JavaSparkContext sc = new JavaSparkContext("local[4]", 
"SparkStream",conf);
                  JavaStreamingContext jssc = new JavaStreamingContext(sc, new 
Duration(3000));


                  /* Receive Kafka streaming inputs */
                  JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );


                  /* Create DStream */
                  JavaDStream<TestTable> data = messages.map(new 
Function<Tuple2<String,String>, TestTable >()
                  {
                        public TestTable call(Tuple2<String, String> message)
                        {
                                return new TestTable(new Integer(++key), 
message._2() );
                        }
                  }
                  );

                  /* Write to cassandra */
                
javaFunctions(data,TestTable.class).saveToCassandra("testkeyspace","test_table");
        //      data.print(); //creates console output stream.


                  jssc.start();
                  jssc.awaitTermination();

        }
}

class TestTable implements Serializable
{
        Integer key;
        String value;

        public TestTable() {}

        public TestTable(Integer k, String v)
        {
                  key=k;
                  value=v;
        }

        public Integer getKey(){
                  return key;
        }

        public void setKey(Integer k){
                  key=k;
        }

        public String getValue(){
                  return value;
        }

        public void setValue(String v){
                  value=v;
        }

        public String toString(){
                  return 
MessageFormat.format("TestTable'{'key={0},value={1}'}'", key, value);
        }
}

The output log is:
Exception in thread "main" java.lang.NoSuchMethodError: 
com.datastax.spark.connector.streaming.DStreamFunctions.<init>(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
        at 
com.datastax.spark.connector.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:17)
        at 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
        at com.spark.SparkStream.main(SparkStream.java:83)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


And the POM dependencies are:

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming-kafka_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.0.4</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.1</version>
</dependency>


        <dependency>
                  <groupId>com.msiops.footing</groupId>
                  <artifactId>footing-tuple</artifactId>
                  <version>0.2</version>
        </dependency>

        <dependency>
                  <groupId>com.datastax.cassandra</groupId>
                  <artifactId>cassandra-driver-core</artifactId>
                  <version>1.0.8</version>
        </dependency>


Thanks,
Aiman



________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
______________________________________________________________________________________

www.accenture.com

Reply via email to