Hi Antonio,

It’s your lucky day ;) We just released Spark Cassandra Connector 1.3.0-M1 for 
Spark 1.3 and DataSources API
Give it a little while to propagate to 
http://search.maven.org/#search%7Cga%7C1%7Cspark-cassandra-connector 
<http://search.maven.org/#search|ga|1|spark-cassandra-connector> 

'spark-cassandra-connector-java_2.10’ just tells me you are using the version 
that is compiled against scala 2.10, what is the actual connector version 
itself?

Thanks,
Helena
@helenaedelson


> On Jun 1, 2015, at 1:08 PM, Antonio Giambanco <antogia...@gmail.com> wrote:
> 
> Hi Helena,
> thanks for answering me . . . 
> I didn't realize it could be the connector version, unfortunately i didn't 
> try yet.
> I know scala is better but i'm using drools and i'm forced to use java
> in my project i'm using spark-cassandra-connector-java_2.10
> 
> from cassandra I have only this log
> 
> INFO [ScheduledTasks:1] 2015-06-01 15:49:03,260 ColumnFamilyStore.java (line 
> 795) Enqueuing flush of Memtable-sstable_activity@361028148(455/4550 
> serialized/live bytes, 180 ops)
>  INFO [FlushWriter:76] 2015-06-01 15:49:03,261 Memtable.java (line 362) 
> Writing Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 
> 180 ops)
>  INFO [FlushWriter:76] 2015-06-01 15:49:03,273 Memtable.java (line 402) 
> Completed flushing 
> /var/lib/cassandra/data/system/sstable_activity/system-sstable_activity-jb-103-Data.db
>  (248 bytes) for commitlog position ReplayPosition(segmentId=1432896540485, 
> position=1217022)
> 
> 
> 
> also on spark I found this exception
> 
> 
> 
> 15/06/01 16:43:30 ERROR Executor: Exception in task 0.0 in stage 61.0 (TID 81)
> java.io.IOException: Failed to prepare statement INSERT INTO 
> "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
> (no host was tried)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>         at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>         at 
> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>         at $Proxy17.prepare(Unknown Source)
>         at 
> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>         at $Proxy17.prepare(Unknown Source)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>         ... 15 more
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>  at 
> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>         at 
> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>         at 
> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>         at 
> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>         ... 24 more
> 15/06/01 16:43:30 INFO TaskSetManager: Starting task 1.0 in stage 61.0 (TID 
> 82, localhost, PROCESS_LOCAL, 1850 bytes)
> 15/06/01 16:43:30 INFO Executor: Running task 1.0 in stage 61.0 (TID 82)
> 15/06/01 16:43:30 WARN TaskSetManager: Lost task 0.0 in stage 61.0 (TID 81, 
> localhost): java.io.IOException: Failed to prepare statement INSERT INTO 
> "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
> (no host was tried)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>         at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>         at 
> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>         at $Proxy17.prepare(Unknown Source)
>         at 
> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>         at $Proxy17.prepare(Unknown Source)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>   ... 15 more
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>         at 
> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>         at 
> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>         at 
> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>         at 
> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>         ... 24 more
> 
> 15/06/01 16:43:30 ERROR TaskSetManager: Task 0 in stage 61.0 failed 1 times; 
> aborting job
> 15/06/01 16:43:30 INFO TaskSchedulerImpl: Cancelling stage 61
> 15/06/01 16:43:30 INFO Executor: Executor is trying to kill task 1.0 in stage 
> 61.0 (TID 82)
> 15/06/01 16:43:30 INFO TaskSchedulerImpl: Stage 61 was cancelled
> 15/06/01 16:43:30 ERROR Executor: Exception in task 1.0 in stage 61.0 (TID 82)
> org.apache.spark.TaskKilledException
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> 15/06/01 16:43:30 INFO DAGScheduler: Stage 61 (foreachRDD at 
> DWXNavigation.java:95) failed in 0.015 s
> 15/06/01 16:43:30 INFO DAGScheduler: Job 61 failed: foreachRDD at 
> DWXNavigation.java:95, took 0.027368 s
> 15/06/01 16:43:30 ERROR JobScheduler: Error running job streaming job 
> 1433169810000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 61.0 failed 1 times, most recent failure: Lost task 0.0 in stage 61.0 
> (TID 81, localhost): java.io.IOException: Failed to prepare statement INSERT 
> INTO "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
> (no host was tried)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>         at 
> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>         at 
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>         at 
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at 
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>  at java.lang.Thread.run(Thread.java:722)
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>         at 
> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>         at 
> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>         at $Proxy17.prepare(Unknown Source)
>         at 
> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>         at 
> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>         at $Proxy17.prepare(Unknown Source)
>         at com.datastax.spark.connector.writer.TableWriter.com 
> <http://com.datastax.spark.connector.writer.tablewriter.com/>$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>         ... 15 more
> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
> host(s) tried for query failed (no host was tried)
>         at 
> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>         at 
> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>         at 
> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>         at 
> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>         at 
> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>         ... 24 more
> 
> Driver stacktrace:
>         at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at scala.Option.foreach(Option.scala:236)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/06/01 16:43:30 WARN TaskSetManager: Lost task 1.0 in stage 61.0 (TID 82, 
> localhost): org.apache.spark.TaskKilledException
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> 
> A G
> 
> 2015-06-01 13:26 GMT+02:00 Helena Edelson <helena.edel...@datastax.com 
> <mailto:helena.edel...@datastax.com>>:
> Hi Antonio,
>  
> First, what version of the Spark Cassandra Connector are you using? You are 
> using Spark 1.3.1, which the Cassandra connector today supports in builds 
> from the master branch only - the release with public artifacts supporting 
> Spark 1.3.1 is coming soon ;)
> Please see 
> https://github.com/datastax/spark-cassandra-connector#version-compatibility 
> <https://github.com/datastax/spark-cassandra-connector#version-compatibility> 
> Try the version change and LMK.
> 
> What does your cassandra log say?
> 
> Note that you can read from a Spark stream like Flume, for instance in your 
> flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I 
> have not used java)
> (here it’s kafka) 
> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39
>  
> <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39>
> And write inline to Cassandra 
> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45
>  
> <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45>
>  
> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64
>  
> <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64>
>  
> 
> Helena
> tw: @helenaedelson
> 
> 
>> On May 29, 2015, at 6:11 AM, Antonio Giambanco <antogia...@gmail.com 
>> <mailto:antogia...@gmail.com>> wrote:
>> 
>> Hi all,
>> I have in a single server installed spark 1.3.1 and cassandra 2.0.14
>> I'm coding a simple java class for Spark Streaming as follow:
>> reading header events from flume sink
>> based on header I write the event body on navigation or transaction table 
>> (cassandra)
>> unfortunatly I get NoHostAvailableException, if I comment the code for 
>> saving one of the two tables everything works
>> 
>> 
>> here the code
>> 
>>      public static void main(String[] args) {
>>          
>>         // Create a local StreamingContext with two working thread and batch 
>> interval of 1 second
>>          SparkConf conf = new 
>> SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp");
>>          
>>          conf.set("spark.cassandra.connection.host", "127.0.0.1");
>>          conf.set("spark.cassandra.connection.native.port","9042");
>>          conf.set("spark.cassandra.output.batch.size.rows", "1");
>>          conf.set("spark.cassandra.output.concurrent.writes", "1");
>>          
>>          
>>          final JavaStreamingContext jssc = new JavaStreamingContext(conf, 
>> Durations.seconds(1));
>>          
>>          JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = 
>> FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888);     
>>          
>>         
>>          JavaDStream<String> logRowsNavig = flumeStreamNavig.map(
>>                  new Function<SparkFlumeEvent,String>(){
>> 
>>                     @Override
>>                     public String call(SparkFlumeEvent arg0) throws 
>> Exception {
>>                         // TODO Auto-generated method stub0.
>>                         
>>                         Map<CharSequence,CharSequence> headers = 
>> arg0.event().getHeaders();
>>                                                 
>>                         ByteBuffer bytePayload = arg0.event().getBody(); 
>>                         String s = headers.get("source_log").toString() + 
>> "#" + new String(bytePayload.array());
>>                         System.out.println("RIGA: " + s);
>>                         return s;
>>                     }
>>                  });
>>          
>>         
>>          logRowsNavig.foreachRDD(
>>                  new Function<JavaRDD<String>,Void>(){
>>                     @Override
>>                     public Void call(JavaRDD<String> rows) throws Exception {
>>                                                 
>>                         if(!rows.isEmpty()){
>>                              
>>                              //String header = 
>> getHeaderFronRow(rows.collect());
>>                              
>>                              List<Navigation> listNavigation = new 
>> ArrayList<Navigation>();
>>                              List<Transaction> listTransaction = new 
>> ArrayList<Transaction>();
>>                              
>>                              for(String row : rows.collect()){
>>                                  
>>                                  String header = row.substring(0, 
>> row.indexOf("#"));
>>                                  
>>                                  if(header.contains("controller_log")){
>>                                      
>> listNavigation.add(createNavigation(row));
>>                                      System.out.println("Added Element in 
>> Navigation List");
>>                                  
>>                                  }else if(header.contains("business_log")){
>>                                      
>> listTransaction.add(createTransaction(row));
>>                                      System.out.println("Added Element in 
>> Transaction List");
>>                                  }     
>>                                      
>>                              }
>>                              
>>                     
>>                              if(!listNavigation.isEmpty()){
>>                                  JavaRDD<Navigation> navigationRows= 
>> jssc.sparkContext().parallelize(listNavigation);
>>                             
>>                                  
>> javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", 
>> mapToRow(Navigation.class)).saveToCassandra();
>>                              }
>>                 
>>                                 
>>                              if(!listTransaction.isEmpty()){
>>                                  JavaRDD<Transaction> transactionRows= 
>> jssc.sparkContext().parallelize(listTransaction);
>>                                     
>>                                  
>> javaFunctions(transactionRows).writerBuilder("cassandrasink", "transaction", 
>> mapToRow(Transaction.class)).saveToCassandra();
>>                                  
>>                              }
>>                             
>>                         }
>>                         return null;
>>                         
>>                     }
>>              });
>>          
>>         jssc.start();              // Start the computation
>>         jssc.awaitTermination();   // Wait for the computation to terminate 
>>      }
>> 
>> 
>> here the exception
>> 
>> 
>> 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute:
>> 
>> com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 
>> <mailto:com.datastax.spark.connector.writer.RichBatchStatement@ab76b83>
>> com.datastax.driver.core.exceptions.NoHostAvailableException: All
>> 
>> host(s) tried for query failed (no host was tried)
>> 
>>          at
>> 
>> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>> 
>>          at
>> 
>> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>> 
>>          at
>> 
>> com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577)
>> 
>>          at
>> 
>> com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119)
>> 
>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>>          at
>> 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> 
>>          at
>> 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>>          at java.lang.reflect.Method.invoke(Method.java:601)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>> 
>>          at $Proxy17.executeAsync(Unknown Source)
>> 
>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>>          at
>> 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> 
>>          at
>> 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>>          at java.lang.reflect.Method.invoke(Method.java:601)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>> 
>>          at $Proxy17.executeAsync(Unknown Source)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136)
>> 
>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 
>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> 
>>          at
>> 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>> 
>>          at java.lang.Thread.run(Thread.java:722)
>> 
>> 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 
>> 20)
>> 
>> java.io.IOException: Failed to write statements to cassandrasink.navigation.
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 
>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> 
>>          at
>> 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>> 
>>          at java.lang.Thread.run(Thread.java:722)
>> 
>> 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20, 
>> localhost): java.io.IOException: Failed to write statements to 
>> cassandrasink.navigation.
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>> 
>>          at
>> 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>> 
>>          at
>> 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>> 
>>          at
>> 
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 
>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> 
>>          at
>> 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>> 
>>          at
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>> 
>>          at java.lang.Thread.run(Thread.java:722)
>> 
>> 
>> 
>> 
>> A G
>> 
> 
> 

Reply via email to