NoSuchMethodError: writing spark-streaming data to cassandra
Hi, I am intending to save the streaming data from kafka into Cassandra, using spark-streaming: But there seems to be problem with line javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); I am getting NoSuchMethodError. The code, the error-log and POM.xml dependencies are listed below: Please help me find the reason as to why is this happening. public class SparkStream { static int key=0; public static void main(String args[]) throws Exception { if(args.length != 3) { System.out.println(SparkStream zookeeper_ip group_nm topic1,topic2,...); System.exit(1); } Logger.getLogger(org).setLevel(Level.OFF); Logger.getLogger(akka).setLevel(Level.OFF); MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(3)); } /* Connection to Spark */ SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); /* Receive Kafka streaming inputs */ JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); /* Create DStream */ JavaDStreamTestTable data = messages.map(new FunctionTuple2String,String, TestTable () { public TestTable call(Tuple2String, String message) { return new TestTable(new Integer(++key), message._2() ); } } ); /* Write to cassandra */ javaFunctions(data,TestTable.class).saveToCassandra(testkeyspace,test_table); // data.print(); //creates console output stream. jssc.start(); jssc.awaitTermination(); } } class TestTable implements Serializable { Integer key; String value; public TestTable() {} public TestTable(Integer k, String v) { key=k; value=v; } public Integer getKey(){ return key; } public void setKey(Integer k){ key=k; } public String getValue(){ return value; } public void setValue(String v){ value=v; } public String toString(){ return MessageFormat.format(TestTable'{'key={0},value={1}'}', key, value); } } The output log is: Exception in thread main java.lang.NoSuchMethodError: com.datastax.spark.connector.streaming.DStreamFunctions.init(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V at com.datastax.spark.connector.DStreamJavaFunctions.init(DStreamJavaFunctions.java:17) at com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89) at com.spark.SparkStream.main(SparkStream.java:83) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) And the POM dependencies are: dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-java_2.10/artifactId version1.0.4/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.1/version /dependency dependency groupIdcom.msiops.footing/groupId
Re: NoSuchMethodError: writing spark-streaming data to cassandra
Hi, @Gerard- Thanks, i added one more dependency for conf.set(spark.cassandra.connection.host, localhost). But now, i am able to create a connection, but the data is not getting inserted into the cassandra table. and the logs show its getting connected and the next second getting disconnected. the full code and the logs and dependencies are below: public class SparkStream { static int key=0; public static void main(String args[]) throws Exception { if(args.length != 3) { System.out.println(parameters not given properly); System.exit(1); } Logger.getLogger(org).setLevel(Level.OFF); Logger.getLogger(akka).setLevel(Level.OFF); MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(3)); } /* Connection to Spark */ SparkConf conf = new SparkConf(); conf.set(spark.cassandra.connection.host, localhost); JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); /* connection to cassandra */ CassandraConnector connector = CassandraConnector.apply(sc.getConf()); System.out.println(+++cassandra connector created); /* Receive Kafka streaming inputs */ JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(+streaming Connection done!+++); /* Create DStream */ JavaDStreamTestTable data = messages.map(new Function Tuple2String,String, TestTable () { public TestTable call(Tuple2String, String message) { return new TestTable(new Integer(++key), message._2() ); } } ); System.out.println(JavaDStreamTestTable created); /* Write to cassandra */ javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); jssc.start(); jssc.awaitTermination(); } } class TestTable implements Serializable { Integer key; String value; public TestTable() {} public TestTable(Integer k, String v) { key=k; value=v; } public Integer getKey(){ return key; } public void setKey(Integer k){ key=k; } public String getValue(){ return value; } public void setValue(String v){ value=v; } public String toString(){ return MessageFormat.format(TestTable'{'key={0}, value={1}'}', key, value); } } The log is: +++cassandra connector created +streaming Connection done!+++ JavaDStreamTestTable created 14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added 14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added 14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster The POM.xml dependencies are: dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector_2.10/artifactId version1.1.0/version /dependency dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-java_2.10/artifactId version1.1.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.1/version /dependency dependency groupIdcom.msiops.footing/groupId artifactIdfooting-tuple/artifactId version0.2/version /dependency dependency groupIdcom.datastax.cassandra/groupId artifactIdcassandra-driver-core/artifactId version2.1.3/version /dependency Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Tuesday, December 9, 2014 4:39 PM To: Sarosh, M. Cc: spark users Subject: Re: NoSuchMethodError: writing spark-streaming data to
Error: Spark-streaming to Cassandra
Hi, I am intending to save the streaming data from kafka into Cassandra, using spark-streaming: But there seems to be problem with line javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); I am getting 2 errors. the code, the error-log and POM.xml dependencies are listed below: Please help me find the reason as to why is this happening. public class SparkStream { static int key=0; public static void main(String args[]) throws Exception { if(args.length != 3) { System.out.println(SparkStream zookeeper_ip group_nm topic1,topic2,...); System.exit(1); } Logger.getLogger(org).setLevel(Level.OFF); Logger.getLogger(akka).setLevel(Level.OFF); MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(3)); } /* Connection to Spark */ SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); /* connection to cassandra */ /*conf.set(spark.cassandra.connection.host, 127.0.0.1:9042); CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute(CREATE TABLE IF NOT EXISTS testkeyspace.test_table (key INT PRIMARY KEY, value TEXT)); */ /* Receive Kafka streaming inputs */ JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); /* Create DStream */ JavaDStreamTestTable data = messages.map(new FunctionTuple2String,String, TestTable () { public TestTable call(Tuple2String, String message) { return new TestTable(new Integer(++key), message._2() ); } } ); /* Write to cassandra */ javaFunctions(data).writerBuilder(testkeyspace, test_table, mapToRow(TestTable.class)).saveToCassandra(); // data.print(); jssc.start(); jssc.awaitTermination(); } } class TestTable implements Serializable { Integer key; String value; public TestTable() {} public TestTable(Integer k, String v) { key=k; value=v; } public Integer getKey(){ return key; } public void setKey(Integer k){ key=k; } public String getValue(){ return value; } public void setValue(String v){ value=v; } public String toString(){ return MessageFormat.format(TestTable'{'key={0}, value={1}'}', key, value); } } The output log is: [INFO] Compiling 1 source file to /root/Documents/SparkStreamSample/target/classes [INFO] 2 errors [INFO] - [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81] cannot find symbol symbol: method mapToRow(java.lang.Classcom.spark.TestTable) location: class com.spark.SparkStream [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17] no suitable method found for javaFunctions(org.apache.spark.streaming.api.java.JavaDStreamcom.spark.TestTable) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.api.java.JavaDStreamT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.dstream.DStreamT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.api.java.JavaRDDT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.rdd.RDDT,java.lang.ClassT) is not applicable (cannot infer type-variable(s) T (actual and formal argument lists differ in length)) method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext) is not applicable (argument mismatch; org.apache.spark.streaming.api.java.JavaDStreamcom.spark.TestTable cannot be converted to org.apache.spark.streaming.api.java.JavaStreamingContext) method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext) is not
Re: Spark-Streaming: output to cassandra
Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) = line.split(,)} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
Re: Spark-Streaming: output to cassandra
Thank you Helena, But I would like to explain my problem space: The output is supposed to be Cassandra. To achieve that, I have to use spark-cassandra-connecter APIs. So going in a botton-up approach, to write to cassandra, I have to use: javaFunctions(JavaRDD object rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); To use the above function javaFunctions, I need to obtain the JavaRDD object, using the sc.parallelize() like this: JavaRDDTestTable rdd = sc.parallelize(list); The above sc.parallelize(list) accepts List object as a parameter. The above List object will contain the data obtained either from JavaDStream or JavaPairReceiverDStream, that has the streaming data. So the flow is: I need: 1. JavaDStream data to get mapped into List. 2. The above List object to be passed to sc.parallelize(list) to obtain JavaRDD object. 3. The above JavaRDD object to be passed to javaFunctions().saveToCassandra(). For all this I need a code that maps my JavaDStream data into List. Once step 1 is done, steps 2 and 3 can easily be performed. I need help for step 1. I have written the code below to do it: JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); /* connection to cassandra */ CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute(CREATE TABLE testkeyspace.test_table (key TEXT PRIMARY KEY, value TEXT)); ListTestTable list = new ArrayListTestTable(); messages.foreachRDD(new FunctionTuple2String,String, Void() { public Void call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.add(tbl); return null; } } jssc.start(); jssc.awaitTermination(new Duration(60* 1000)); It would be great help if a way is suggested to map the JavaDStream to List. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Helena Edelson helena.edel...@datastax.com Sent: Friday, December 5, 2014 9:12 PM To: Sarosh, M. Cc: user Subject: Re: Spark-Streaming: output to cassandra I think what you are looking for is something like: JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, mapColumnTo(Double.class)).select(price); JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, mapRowTo(Person.class)); noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, m.sar...@accenture.commailto:m.sar...@accenture.com m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi Akhil, Vyas, Helena, Thank you for your suggestions. As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra test_table with columns key:text primary key and value:text I have mapped the data successfully into JavaDStreamTuple2String,String data : JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStreamTuple2String,String data = messages.map(new Function Tuple2String,String, Tuple2String,String () { public Tuple2String,String call(Tuple2String, String message) { return new Tuple2String,String( message._1(), message._2() ); } } ); Then I have created a List: ListTestTable list = new ArrayListTestTable(); where TestTable is my custom class having the same structure as my Cassandra table, with members key and value: class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return Key:+key+,Val:+val; } } Please suggest a way how to I add the data from JavaDStreamTuple2String,String data into the ListTestTable list. I am doing this so that I can subsequently use JavaRDDTestTable rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, test_table); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new FunctionTuple2String,String, String() { public ListTestTable call(Tuple2String,String message) { String k = message._1(); String v = message._2(); TestTable tbl = new
Re: Spark-Streaming: output to cassandra
Hi Gerard/Akhil, By how do I specify a batch I was trying to ask that when does the data in the JavaDStream gets flushed into Cassandra table?. I read somewhere that the streaming data in batches gets written in Cassandra. This batch can be of some particular time, or one particular run. That was what I was trying to understand, how to set that Batch in my program. Because if a batch means one cycle run of my streaming app, then in my app, I'm hitting a Ctrl+C to kill the program. So the program is terminating, and would the data get inserted successfully into my Cassandra table? For example, in Terminal-A I'm running Kafka-producer to stream-in messages. Terminal-B I'm running my Streaming App. In my App there is a line jssc.awaitTermination();? which will keep running my App till I kill it. Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing it. So its a kind of ungraceful termination. So in this case will the data in my App DStream get written into Cassandra? Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. From: Gerard Maas gerard.m...@gmail.com Sent: Thursday, December 4, 2014 10:22 PM To: Akhil Das Cc: Sarosh, M.; user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra I guess he's already doing so, given the 'saveToCassandra' usage. What I don't understand is the question how do I specify a batch. That doesn't make much sense to me. Could you explain further? -kr, Gerard. On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: You can use the datastax's Cassandra connector.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Thanks Best Regards On Thu, Dec 4, 2014 at 8:21 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, new Duration(1000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done!); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { return message._2(); } } ); //data.print(); -- output to console data.foreachRDD(saveToCassandra(mykeyspace,mytable)); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra(mykeyspace,mytable));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841tel:%28%2B91%29%20-%209836112841. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.comhttp://www.accenture.com
Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.*; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import java.util.Map; import java.util.HashMap; public class SparkStream { public static void main(String args[]) { if(args.length != 3) { System.out.println(Usage: spark-submit -class com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip group_name topic1,topic2,...); System.exit(1); } MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(1)); } JavaStreamingContext jssc = new JavaStreamingContext(spark://192.168.88.130:7077, SparkStream, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { System.out.println(NewMessage: +message._2()); //for debugging return message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Hi kafka second message another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: --- Time: 1417107363000 ms --- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer and bin/kafka-console-consumer... its working perfect, but why not the code above? Please help me. Regards, Aiman Sarosh This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant
RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, The spark master is working, and I have given the same url in the code: [cid:image001.png@01D00D82.6DC2FFF0] The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks What should be my approach now ? Need urgent help. Regards, Aiman From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, December 01, 2014 3:56 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077http://192.168.88.130:7077/ ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import