Hi Antonio,
 
First, what version of the Spark Cassandra Connector are you using? You are 
using Spark 1.3.1, which the Cassandra connector today supports in builds from 
the master branch only - the release with public artifacts supporting Spark 
1.3.1 is coming soon ;)
Please see 
https://github.com/datastax/spark-cassandra-connector#version-compatibility 
<https://github.com/datastax/spark-cassandra-connector#version-compatibility> 
Try the version change and LMK.

What does your cassandra log say?

Note that you can read from a Spark stream like Flume, for instance in your 
flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I have 
not used java)
(here it’s kafka) 
https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39
 
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39>
And write inline to Cassandra 
https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45
 
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45>
 
https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64
 
<https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64>
 

Helena
tw: @helenaedelson


> On May 29, 2015, at 6:11 AM, Antonio Giambanco <antogia...@gmail.com> wrote:
> 
> Hi all,
> I have in a single server installed spark 1.3.1 and cassandra 2.0.14
> I'm coding a simple java class for Spark Streaming as follow:
> reading header events from flume sink
> based on header I write the event body on navigation or transaction table 
> (cassandra)
> unfortunatly I get NoHostAvailableException, if I comment the code for saving 
> one of the two tables everything works
> 
> 
> here the code
> 
>      public static void main(String[] args) {
>          
>         // Create a local StreamingContext with two working thread and batch 
> interval of 1 second
>          SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp");
>          
>          conf.set("spark.cassandra.connection.host", "127.0.0.1");
>          conf.set("spark.cassandra.connection.native.port","9042");
>          conf.set("spark.cassandra.output.batch.size.rows", "1");
>          conf.set("spark.cassandra.output.concurrent.writes", "1");
>          
>          
>          final JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(1));
>          
>          JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = 
> FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888);     
>          
>         
>          JavaDStream<String> logRowsNavig = flumeStreamNavig.map(
>                  new Function<SparkFlumeEvent,String>(){
> 
>                     @Override
>                     public String call(SparkFlumeEvent arg0) throws Exception 
> {
>                         // TODO Auto-generated method stub0.
>                         
>                         Map<CharSequence,CharSequence> headers = 
> arg0.event().getHeaders();
>                                                 
>                         ByteBuffer bytePayload = arg0.event().getBody(); 
>                         String s = headers.get("source_log").toString() + "#" 
> + new String(bytePayload.array());
>                         System.out.println("RIGA: " + s);
>                         return s;
>                     }
>                  });
>          
>         
>          logRowsNavig.foreachRDD(
>                  new Function<JavaRDD<String>,Void>(){
>                     @Override
>                     public Void call(JavaRDD<String> rows) throws Exception {
>                                                 
>                         if(!rows.isEmpty()){
>                              
>                              //String header = 
> getHeaderFronRow(rows.collect());
>                              
>                              List<Navigation> listNavigation = new 
> ArrayList<Navigation>();
>                              List<Transaction> listTransaction = new 
> ArrayList<Transaction>();
>                              
>                              for(String row : rows.collect()){
>                                  
>                                  String header = row.substring(0, 
> row.indexOf("#"));
>                                  
>                                  if(header.contains("controller_log")){
>                                      
> listNavigation.add(createNavigation(row));
>                                      System.out.println("Added Element in 
> Navigation List");
>                                  
>                                  }else if(header.contains("business_log")){
>                                      
> listTransaction.add(createTransaction(row));
>                                      System.out.println("Added Element in 
> Transaction List");
>                                  }     
>                                      
>                              }
>                              
>                     
>                              if(!listNavigation.isEmpty()){
>                                  JavaRDD<Navigation> navigationRows= 
> jssc.sparkContext().parallelize(listNavigation);
>                             
>                                  
> javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", 
> mapToRow(Navigation.class)).saveToCassandra();
>                              }
>                 
>                                 
>                              if(!listTransaction.isEmpty()){
>                                  JavaRDD<Transaction> transactionRows= 
> jssc.sparkContext().parallelize(listTransaction);
>                                     
>                                  
> javaFunctions(transactionRows).writerBuilder("cassandrasink", "transaction", 
> mapToRow(Transaction.class)).saveToCassandra();
>                                  
>                              }
>                             
>                         }
>                         return null;
>                         
>                     }
>              });
>          
>         jssc.start();              // Start the computation
>         jssc.awaitTermination();   // Wait for the computation to terminate 
>      }
> 
> 
> here the exception
> 
> 
> 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute:
> 
> com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 
> <mailto:com.datastax.spark.connector.writer.RichBatchStatement@ab76b83>
> com.datastax.driver.core.exceptions.NoHostAvailableException: All
> 
> host(s) tried for query failed (no host was tried)
> 
>          at
> 
> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577)
> 
>          at
> 
> com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119)
> 
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>          at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
>          at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          at java.lang.reflect.Method.invoke(Method.java:601)
> 
>          at
> 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
> 
>          at $Proxy17.executeAsync(Unknown Source)
> 
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>          at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
>          at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>          at java.lang.reflect.Method.invoke(Method.java:601)
> 
>          at
> 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
> 
>          at $Proxy17.executeAsync(Unknown Source)
> 
>          at
> 
> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
> 
>          at
> 
> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
> 
>          at
> 
> com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136)
> 
>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 
>          at
> 
> com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 20)
> 
> java.io.IOException: Failed to write statements to cassandrasink.navigation.
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20, 
> localhost): java.io.IOException: Failed to write statements to 
> cassandrasink.navigation.
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
> 
>          at
> 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
> 
>          at
> 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
> 
>          at
> 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 
>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
> 
>          at
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> 
>          at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> 
>          at java.lang.Thread.run(Thread.java:722)
> 
> 
> 
> 
> A G
> 

Reply via email to