NoSuchMethodError: writing spark-streaming data to cassandra

2014-12-09 Thread m.sarosh

Hi,

I am intending to save the streaming data from kafka into Cassandra, using 
spark-streaming:
But there seems to be problem with line
javaFunctions(data).writerBuilder(testkeyspace, test_table, 
mapToRow(TestTable.class)).saveToCassandra();
I am getting NoSuchMethodError.
The code, the error-log and POM.xml dependencies are listed below:
Please help me find the reason as to why is this happening.


public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{
  if(args.length != 3)
  {
System.out.println(SparkStream zookeeper_ip 
group_nm topic1,topic2,...);
System.exit(1);
  }

  Logger.getLogger(org).setLevel(Level.OFF);
  Logger.getLogger(akka).setLevel(Level.OFF);
  MapString,Integer topicMap = new HashMapString,Integer();
  String[] topic = args[2].split(,);
  for(String t: topic)
  {
topicMap.put(t, new Integer(3));
  }

  /* Connection to Spark */
  SparkConf conf = new SparkConf();
  JavaSparkContext sc = new JavaSparkContext(local[4], 
SparkStream,conf);
  JavaStreamingContext jssc = new JavaStreamingContext(sc, new 
Duration(3000));


  /* Receive Kafka streaming inputs */
  JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );


  /* Create DStream */
  JavaDStreamTestTable data = messages.map(new 
FunctionTuple2String,String, TestTable ()
  {
public TestTable call(Tuple2String, String message)
{
return new TestTable(new Integer(++key), 
message._2() );
}
  }
  );

  /* Write to cassandra */

javaFunctions(data,TestTable.class).saveToCassandra(testkeyspace,test_table);
//  data.print(); //creates console output stream.


  jssc.start();
  jssc.awaitTermination();

}
}

class TestTable implements Serializable
{
Integer key;
String value;

public TestTable() {}

public TestTable(Integer k, String v)
{
  key=k;
  value=v;
}

public Integer getKey(){
  return key;
}

public void setKey(Integer k){
  key=k;
}

public String getValue(){
  return value;
}

public void setValue(String v){
  value=v;
}

public String toString(){
  return 
MessageFormat.format(TestTable'{'key={0},value={1}'}', key, value);
}
}

The output log is:
Exception in thread main java.lang.NoSuchMethodError: 
com.datastax.spark.connector.streaming.DStreamFunctions.init(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
at 
com.datastax.spark.connector.DStreamJavaFunctions.init(DStreamJavaFunctions.java:17)
at 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
at com.spark.SparkStream.main(SparkStream.java:83)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


And the POM dependencies are:

dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming-kafka_2.10/artifactId
  version1.1.0/version
/dependency

dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming_2.10/artifactId
  version1.1.0/version
/dependency

dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector-java_2.10/artifactId
version1.0.4/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.1/version
/dependency


dependency
  groupIdcom.msiops.footing/groupId
  

Re: NoSuchMethodError: writing spark-streaming data to cassandra

2014-12-09 Thread m.sarosh
Hi,

@Gerard- Thanks, i added one more dependency for 
conf.set(spark.cassandra.connection.host, localhost).

But now, i am able to create a connection, but the data is not getting inserted 
into the cassandra table.
and the logs show its getting connected and the next second getting 
disconnected.
the full code and the logs and dependencies are below:

public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{

if(args.length != 3)
{
System.out.println(parameters not given properly);
System.exit(1);
}

Logger.getLogger(org).setLevel(Level.OFF);
Logger.getLogger(akka).setLevel(Level.OFF);
MapString,Integer topicMap = new HashMapString,Integer();
String[] topic = args[2].split(,);
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

/* Connection to Spark */
SparkConf conf = new SparkConf();
conf.set(spark.cassandra.connection.host, localhost);
JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));


/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
System.out.println(+++cassandra connector 
created);

/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println(+streaming Connection 
done!+++);


/* Create DStream */
JavaDStreamTestTable data = messages.map(new Function Tuple2String,String, 
TestTable ()
{
public TestTable call(Tuple2String, String message)
{
return new TestTable(new Integer(++key), message._2() );
}
}
);
System.out.println(JavaDStreamTestTable 
created);


/* Write to cassandra */
javaFunctions(data).writerBuilder(testkeyspace, test_table, 
mapToRow(TestTable.class)).saveToCassandra();

jssc.start();
jssc.awaitTermination();

}
}

class TestTable implements Serializable
{
Integer key;
String value;

public TestTable() {}

public TestTable(Integer k, String v)
{
key=k;
value=v;
}

public Integer getKey(){
return key;
}

public void setKey(Integer k){
key=k;
}

public String getValue(){
return value;
}

public void setValue(String v){
value=v;
}

public String toString(){
return MessageFormat.format(TestTable'{'key={0}, value={1}'}', key, 
value);

}
}

The log is:
+++cassandra connector created
+streaming Connection done!+++
JavaDStreamTestTable created
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host 
localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: 
Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 
127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 
127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra 
cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host 
localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: 
Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 
127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 
127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra 
cluster: Test Cluster

The POM.xml dependencies are:
   dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-kafka_2.10/artifactId
version1.1.0/version
/dependency

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.1.0/version
/dependency

dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector-java_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.1/version
/dependency


dependency
groupIdcom.msiops.footing/groupId
artifactIdfooting-tuple/artifactId
version0.2/version
/dependency

dependency
groupIdcom.datastax.cassandra/groupId
artifactIdcassandra-driver-core/artifactId
version2.1.3/version
/dependency



Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

From: Gerard Maas gerard.m...@gmail.com
Sent: Tuesday, December 9, 2014 4:39 PM
To: Sarosh, M.
Cc: spark users
Subject: Re: NoSuchMethodError: writing spark-streaming data to 

Error: Spark-streaming to Cassandra

2014-12-08 Thread m.sarosh
Hi,

I am intending to save the streaming data from kafka into Cassandra,
using spark-streaming:
But there seems to be problem with line
javaFunctions(data).writerBuilder(testkeyspace, test_table, 
mapToRow(TestTable.class)).saveToCassandra();
I am getting 2 errors.
the code, the error-log and POM.xml dependencies are listed below:
Please help me find the reason as to why is this happening.


public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{
if(args.length != 3)
{
System.out.println(SparkStream zookeeper_ip group_nm 
topic1,topic2,...);
System.exit(1);
}

Logger.getLogger(org).setLevel(Level.OFF);
Logger.getLogger(akka).setLevel(Level.OFF);
MapString,Integer topicMap = new HashMapString,Integer();
String[] topic = args[2].split(,);
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

/* Connection to Spark */
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(local[4], 
SparkStream,conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new 
Duration(3000));


  /* connection to cassandra */
/*conf.set(spark.cassandra.connection.host, 127.0.0.1:9042);
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
session.execute(CREATE TABLE IF NOT EXISTS testkeyspace.test_table 
(key INT PRIMARY KEY, value TEXT));
*/

/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );


/* Create DStream */
JavaDStreamTestTable data = messages.map(new 
FunctionTuple2String,String, TestTable ()
{
public TestTable call(Tuple2String, String message)
{
return new TestTable(new Integer(++key), message._2() );
}
}
);


/* Write to cassandra */
javaFunctions(data).writerBuilder(testkeyspace, test_table, 
mapToRow(TestTable.class)).saveToCassandra();
//  data.print();


jssc.start();
jssc.awaitTermination();

}
}

class TestTable implements Serializable
{
Integer key;
String value;

public TestTable() {}

public TestTable(Integer k, String v)
{
key=k;
value=v;
}

public Integer getKey(){
return key;
}

public void setKey(Integer k){
key=k;
}

public String getValue(){
return value;
}

public void setValue(String v){
value=v;
}

public String toString(){
return MessageFormat.format(TestTable'{'key={0},
value={1}'}', key, value);
}
}

The output log is:

[INFO] Compiling 1 source file to
/root/Documents/SparkStreamSample/target/classes
[INFO] 2 errors
[INFO] -
[ERROR] COMPILATION ERROR :
[INFO] -
[ERROR] 
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,81]
cannot find symbol
  symbol:   method mapToRow(java.lang.Classcom.spark.TestTable)
  location: class com.spark.SparkStream
[ERROR] 
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[76,17]
no suitable method found for
javaFunctions(org.apache.spark.streaming.api.java.JavaDStreamcom.spark.TestTable)
method 
com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.api.java.JavaDStreamT,java.lang.ClassT)
is not applicable
  (cannot infer type-variable(s) T
(actual and formal argument lists differ in length))
method 
com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.streaming.dstream.DStreamT,java.lang.ClassT)
is not applicable
  (cannot infer type-variable(s) T
(actual and formal argument lists differ in length))
method 
com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.api.java.JavaRDDT,java.lang.ClassT)
is not applicable
  (cannot infer type-variable(s) T
(actual and formal argument lists differ in length))
method 
com.datastax.spark.connector.CassandraJavaUtil.TjavaFunctions(org.apache.spark.rdd.RDDT,java.lang.ClassT)
is not applicable
  (cannot infer type-variable(s) T
(actual and formal argument lists differ in length))
method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext)
is not applicable
  (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStreamcom.spark.TestTable
cannot be converted to
org.apache.spark.streaming.api.java.JavaStreamingContext)
method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext)
is not 

Re: Spark-Streaming: output to cassandra

2014-12-05 Thread m.sarosh
Hi Akhil, Vyas, Helena,

Thank you for your suggestions.

As Akhil suggested earlier, i have implemented the batch Duration into 
JavaStreamingContext and waitForTermination(Duration).
The approach Helena suggested is Scala oriented.

But the issue now is that I want to set Cassandra as my output.
I have created a table in cassandra test_table with columns key:text primary 
key and value:text
I have mapped the data successfully into JavaDStreamTuple2String,String 
data :

JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStreamTuple2String,String data = messages.map(new Function 
Tuple2String,String, Tuple2String,String ()
{
public Tuple2String,String call(Tuple2String, String message)
{
return new Tuple2String,String( message._1(), message._2() );
}
}
);

Then I have created a List:
ListTestTable list = new ArrayListTestTable();
where TestTable is my custom class having the same structure as my Cassandra 
table, with members key and value:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return Key:+key+,Val:+val;
}
}

Please suggest a way how to I add the data from 
JavaDStreamTuple2String,String data into the ListTestTable list.
I am doing this so that I can subsequently use
JavaRDDTestTable rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, 
test_table);
to save the RDD data into Cassandra.

I had tried coding this way:
 messages.foreachRDD(new FunctionTuple2String,String, String()
{
public ListTestTable call(Tuple2String,String message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
but seems some type mis-match happenning.
Please help.





Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

From: Helena Edelson helena.edel...@datastax.com
Sent: Friday, December 5, 2014 6:26 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Spark-Streaming: output to cassandra

You can just do

You can just do something like this, the Spark Cassandra Connector handles the 
rest


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, Map(KafkaTopicRaw - 10), StorageLevel.DISK_ONLY_2)
  .map { case (_, line) = line.split(,)}
  .map(RawWeatherData(_))
  .saveToCassandra(CassandraKeyspace, CassandraTableRaw)

- Helena
@helenaedelson

On Dec 4, 2014, at 9:51 AM, 
m.sar...@accenture.commailto:m.sar...@accenture.com wrote:


Hi,


I have written the code below which is streaming data from kafka, and printing 
to the console.

I want to extend this, and want my data to go into Cassandra table instead.


JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, 
new Duration(1000));
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println(Connection done!);
JavaDStreamString data = messages.map(new FunctionTuple2String, String, 
String()
{
public String call(Tuple2String, String message)
{
return message._2();
}
}
);
//data.print();   -- output to console
data.foreachRDD(saveToCassandra(mykeyspace,mytable));
jssc.start();
jssc.awaitTermination();



How should I implement the line:

data.foreachRDD(saveToCassandra(mykeyspace,mytable));?

so that data goes into Cassandra, in each batch.  And how do I specify a batch, 
because if i do Ctrl+C on the console of streaming-job-jar, nothing will be 
entered into cassandra for sure since it is getting killed.


Please help.


Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.comhttp://www.accenture.com



Re: Spark-Streaming: output to cassandra

2014-12-05 Thread m.sarosh
Thank you Helena,


But I would like to explain my problem space:


The output is supposed to be Cassandra. To achieve that, I have to use 
spark-cassandra-connecter APIs.

So going in a botton-up approach, to write to cassandra, I have to use:

javaFunctions(JavaRDD object rdd, 
TestTable.class).saveToCassandra(testkeyspace, test_table);

To use the above function javaFunctions, I need to obtain the JavaRDD object, 
using the sc.parallelize() like this:

JavaRDDTestTable rdd = sc.parallelize(list);

The above sc.parallelize(list) accepts List object as a parameter.

The above List object will contain the data obtained either from JavaDStream or 
JavaPairReceiverDStream, that has the streaming data.


So the flow is:

I need:

1.  JavaDStream data to get mapped into List.

2.  The above List object to be passed to sc.parallelize(list) to obtain 
JavaRDD object.

3.  The above JavaRDD object to be passed to javaFunctions().saveToCassandra().


For all this I need a code that maps my JavaDStream data into List.

Once step 1 is done, steps 2 and 3 can easily be performed.


I need help for step 1.

I have written the code below to do it:

JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println(Connection done!);

/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
session.execute(CREATE TABLE testkeyspace.test_table (key TEXT PRIMARY KEY, 
value TEXT));
ListTestTable list = new ArrayListTestTable();
messages.foreachRDD(new FunctionTuple2String,String, Void()
{
public Void call(Tuple2String,String message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.add(tbl);
return null;
}
}
jssc.start();
jssc.awaitTermination(new Duration(60* 1000));


 It would be great help if a  way is suggested to map the JavaDStream to List.


Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

From: Helena Edelson helena.edel...@datastax.com
Sent: Friday, December 5, 2014 9:12 PM
To: Sarosh, M.
Cc: user
Subject: Re: Spark-Streaming: output to cassandra

I think what you are looking for is something like:

JavaRDDDouble pricesRDD = javaFunctions(sc).cassandraTable(ks, tab, 
mapColumnTo(Double.class)).select(price);
JavaRDDPerson rdd = javaFunctions(sc).cassandraTable(ks, people, 
mapRowTo(Person.class));

noted here: 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

?

- Helena
@helenaedelson

On Dec 5, 2014, at 10:15 AM, 
m.sar...@accenture.commailto:m.sar...@accenture.com 
m.sar...@accenture.commailto:m.sar...@accenture.com wrote:

Hi Akhil, Vyas, Helena,

Thank you for your suggestions.

As Akhil suggested earlier, i have implemented the batch Duration into 
JavaStreamingContext and waitForTermination(Duration).
The approach Helena suggested is Scala oriented.

But the issue now is that I want to set Cassandra as my output.
I have created a table in cassandra test_table with columns key:text primary 
key and value:text
I have mapped the data successfully into JavaDStreamTuple2String,String 
data :

JavaSparkContext sc = new JavaSparkContext(local[4], SparkStream,conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStreamTuple2String,String data = messages.map(new Function 
Tuple2String,String, Tuple2String,String ()
{
public Tuple2String,String call(Tuple2String, String message)
{
return new Tuple2String,String( message._1(), message._2() );
}
}
);

Then I have created a List:
ListTestTable list = new ArrayListTestTable();
where TestTable is my custom class having the same structure as my Cassandra 
table, with members key and value:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return Key:+key+,Val:+val;
}
}

Please suggest a way how to I add the data from 
JavaDStreamTuple2String,String data into the ListTestTable list.
I am doing this so that I can subsequently use
JavaRDDTestTable rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra(testkeyspace, 
test_table);
to save the RDD data into Cassandra.

I had tried coding this way:
 messages.foreachRDD(new FunctionTuple2String,String, String()
{
public ListTestTable call(Tuple2String,String message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new 

Re: Spark-Streaming: output to cassandra

2014-12-04 Thread m.sarosh
Hi Gerard/Akhil,


By how do I specify a batch I was trying to ask that when does the data in 
the JavaDStream gets flushed into Cassandra table?.

I read somewhere that the streaming data in batches gets written in Cassandra. 
This batch can be of some particular time, or one particular run.

That was what I was trying to understand, how to set that Batch in my 
program. Because if a batch means one cycle run of my  streaming app, then in 
my app, I'm hitting a Ctrl+C to kill the program. So the program is 
terminating, and would the data get inserted successfully into my Cassandra 
table?

For example,

in Terminal-A I'm running Kafka-producer to stream-in messages.

Terminal-B I'm running my Streaming App. In my App there is a line 
jssc.awaitTermination();? which will keep running my App till I kill it.

Eventually I am hitting Ctrl+C in my App terminal, i.e. Terminal-B and killing 
it. So its a kind of ungraceful termination. So in this case will the data in 
my App DStream get written into Cassandra?




Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

From: Gerard Maas gerard.m...@gmail.com
Sent: Thursday, December 4, 2014 10:22 PM
To: Akhil Das
Cc: Sarosh, M.; user@spark.apache.org
Subject: Re: Spark-Streaming: output to cassandra

I guess he's already doing so, given the 'saveToCassandra' usage.
What I don't understand is the question how do I specify a batch. That 
doesn't make much sense to me. Could you explain further?

-kr, Gerard.

On Thu, Dec 4, 2014 at 5:36 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
You can use the datastax's Cassandra 
connector.https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

Thanks
Best Regards

On Thu, Dec 4, 2014 at 8:21 PM, 
m.sar...@accenture.commailto:m.sar...@accenture.com wrote:

Hi,


I have written the code below which is streaming data from kafka, and printing 
to the console.

I want to extend this, and want my data to go into Cassandra table instead.


JavaStreamingContext jssc = new JavaStreamingContext(local[4], SparkStream, 
new Duration(1000));
JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println(Connection done!);
JavaDStreamString data = messages.map(new FunctionTuple2String, String, 
String()
{
public String call(Tuple2String, String message)
{
return message._2();
}
}
);
//data.print();   -- output to console
data.foreachRDD(saveToCassandra(mykeyspace,mytable));
jssc.start();
jssc.awaitTermination();



How should I implement the line:

data.foreachRDD(saveToCassandra(mykeyspace,mytable));?

so that data goes into Cassandra, in each batch.  And how do I specify a batch, 
because if i do Ctrl+C on the console of streaming-job-jar, nothing will be 
entered into cassandra for sure since it is getting killed.


Please help.


Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841tel:%28%2B91%29%20-%209836112841.



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.comhttp://www.accenture.com




Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
   public static void main(String args[])
   {
  if(args.length != 3)
  {
 System.out.println(Usage: spark-submit -class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip 
group_name topic1,topic2,...);
 System.exit(1);
  }


  MapString,Integer topicMap = new HashMapString,Integer();
  String[] topic = args[2].split(,);
  for(String t: topic)
  {
 topicMap.put(t, new Integer(1));
  }

  JavaStreamingContext jssc = new 
JavaStreamingContext(spark://192.168.88.130:7077, SparkStream, new 
Duration(3000));
  JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

  System.out.println(Connection done);
  JavaDStreamString data = messages.map(new 
FunctionTuple2String, String, String()
{
   public String 
call(Tuple2String, String message)
   {
  
System.out.println(NewMessage: +message._2()); //for debugging
  return 
message._2();
   }
});

data.print();

  jssc.start();
  jssc.awaitTermination();

   }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hi kafka
second message
another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


---
Time: 1417107363000 ms
---

14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
---
Time: 141742785 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
---
Time: 1417427853000 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( 
spark://192.168.88.130:7077http://192.168.88.130:7077/ ) Open the webUI 
running on port 8080 and use the master url listed there on top left corner of 
the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, 
m.sar...@accenture.commailto:m.sar...@accenture.com wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import