Hi,
I am integrating Kafka->SparkStreaming->Cassandra, i.e. read the
streaming messages from Kafka and save them into Cassandra table using
spark-streaming as intermediary. Till now I am able to read streaming
data from kafka into spark, and display them on console.
My objective now is output the JavaDStream data to Cassandra table,
instead of the console.
I have written the code below, but it is throwing error.
package com.spark;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import com.datastax.spark.connector.*;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class SparkStream {
public static void main(String args[]) throws Exception
{
int counter = 0;
if(args.length != 3)
{
System.out.println("SparkStream <zookeeper_ip> <group_nm>
<topic1,topic2,...>");
System.exit(1);
}
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext("local[4]",
"SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(3000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("Connection done!+++++++++++++++++++++++++++");
JavaDStream<String> data = messages.map(new
Function<Tuple2<String, String>, String>()
{
public String
call(Tuple2<String, String> message)
{
return message._2();
}
}
);
//data.print();
/* Creating table in cassandra to store kafka streamed messages*/
conf.set("spark.cassandra.connection.host", "127.0.0.1");
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
session.execute("CREATE TABLE testkeyspace.test_table (value
TEXT PRIMARY KEY)");
/* Writing to Cassandra */
javaFunctions(data).writerBuilder("testkeyspace",
"test_table", mapToRow(String.class)).saveToCassandra();
jssc.start();
jssc.awaitTermination(new Duration(60*1000));
}
}
Error which I am getting:
[INFO] 3 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.225 s
[INFO] Finished at: 2014-12-06T05:29:19-05:00
[INFO] Final Memory: 25M/247M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
(default-compile) on project SparkStreamSample: Compilation failure:
Compilation failure:
[ERROR]
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,81]
cannot find symbol
[ERROR] symbol: method mapToRow(java.lang.Class<java.lang.String>)
[ERROR] location: class com.spark.SparkStream
[ERROR]
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,17]
no suitable method found for
javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>)
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.dstream.DStream<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.api.java.JavaRDD<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.rdd.RDD<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to
org.apache.spark.streaming.api.java.JavaStreamingContext)
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.streaming.StreamingContext)
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaSparkContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.api.java.JavaSparkContext)
[ERROR] method
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.SparkContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.SparkContext)
[ERROR]
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[75,39]
incompatible types: org.apache.spark.streaming.Duration cannot be
converted to long
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with
the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
Please help me out.
I am struggling with this since long.
Thanks and Regards,
Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #: (+91) - 9836112841.