:D very happy Helena I'll check tomorrow morning

A G

Il giorno 01/giu/2015, alle ore 19:45, Helena Edelson 
<helena.edel...@datastax.com> ha scritto:

> Hi Antonio,
> 
> It’s your lucky day ;) We just released Spark Cassandra Connector 1.3.0-M1 
> for Spark 1.3 and DataSources API
> Give it a little while to propagate to 
> http://search.maven.org/#search%7Cga%7C1%7Cspark-cassandra-connector 
> 
> 'spark-cassandra-connector-java_2.10’ just tells me you are using the version 
> that is compiled against scala 2.10, what is the actual connector version 
> itself?
> 
> Thanks,
> Helena
> @helenaedelson
> 
> 
>> On Jun 1, 2015, at 1:08 PM, Antonio Giambanco <antogia...@gmail.com> wrote:
>> 
>> Hi Helena,
>> thanks for answering me . . . 
>> I didn't realize it could be the connector version, unfortunately i didn't 
>> try yet.
>> I know scala is better but i'm using drools and i'm forced to use java
>> in my project i'm using spark-cassandra-connector-java_2.10
>> 
>> from cassandra I have only this log
>> 
>> INFO [ScheduledTasks:1] 2015-06-01 15:49:03,260 ColumnFamilyStore.java (line 
>> 795) Enqueuing flush of Memtable-sstable_activity@361028148(455/4550 
>> serialized/live bytes, 180 ops)
>>  INFO [FlushWriter:76] 2015-06-01 15:49:03,261 Memtable.java (line 362) 
>> Writing Memtable-sstable_activity@361028148(455/4550 serialized/live bytes, 
>> 180 ops)
>>  INFO [FlushWriter:76] 2015-06-01 15:49:03,273 Memtable.java (line 402) 
>> Completed flushing 
>> /var/lib/cassandra/data/system/sstable_activity/system-sstable_activity-jb-103-Data.db
>>  (248 bytes) for commitlog position ReplayPosition(segmentId=1432896540485, 
>> position=1217022)
>> 
>> 
>> 
>> also on spark I found this exception
>> 
>> 
>> 
>> 15/06/01 16:43:30 ERROR Executor: Exception in task 0.0 in stage 61.0 (TID 
>> 81)
>> java.io.IOException: Failed to prepare statement INSERT INTO 
>> "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
>> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
>> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
>> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
>> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
>> (no host was tried)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>         at 
>> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>>         at 
>> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:601)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>>         ... 15 more
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>  at 
>> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>>         at 
>> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>>         at 
>> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>>         ... 24 more
>> 15/06/01 16:43:30 INFO TaskSetManager: Starting task 1.0 in stage 61.0 (TID 
>> 82, localhost, PROCESS_LOCAL, 1850 bytes)
>> 15/06/01 16:43:30 INFO Executor: Running task 1.0 in stage 61.0 (TID 82)
>> 15/06/01 16:43:30 WARN TaskSetManager: Lost task 0.0 in stage 61.0 (TID 81, 
>> localhost): java.io.IOException: Failed to prepare statement INSERT INTO 
>> "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
>> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
>> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
>> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
>> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
>> (no host was tried)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>         at 
>> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>>         at 
>> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:601)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>>   ... 15 more
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>         at 
>> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>>         at 
>> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>>         at 
>> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>>         ... 24 more
>> 
>> 15/06/01 16:43:30 ERROR TaskSetManager: Task 0 in stage 61.0 failed 1 times; 
>> aborting job
>> 15/06/01 16:43:30 INFO TaskSchedulerImpl: Cancelling stage 61
>> 15/06/01 16:43:30 INFO Executor: Executor is trying to kill task 1.0 in 
>> stage 61.0 (TID 82)
>> 15/06/01 16:43:30 INFO TaskSchedulerImpl: Stage 61 was cancelled
>> 15/06/01 16:43:30 ERROR Executor: Exception in task 1.0 in stage 61.0 (TID 
>> 82)
>> org.apache.spark.TaskKilledException
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> 15/06/01 16:43:30 INFO DAGScheduler: Stage 61 (foreachRDD at 
>> DWXNavigation.java:95) failed in 0.015 s
>> 15/06/01 16:43:30 INFO DAGScheduler: Job 61 failed: foreachRDD at 
>> DWXNavigation.java:95, took 0.027368 s
>> 15/06/01 16:43:30 ERROR JobScheduler: Error running job streaming job 
>> 1433169810000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
>> stage 61.0 failed 1 times, most recent failure: Lost task 0.0 in stage 61.0 
>> (TID 81, localhost): java.io.IOException: Failed to prepare statement INSERT 
>> INTO "cassandrasink"."transaction" ("event_id", "isin", "security_type", 
>> "security_name", "date", "time", "price", "currency", "user_id", "quantity", 
>> "amount", "session_id") VALUES (:"event_id", :"isin", :"security_type", 
>> :"security_name", :"date", :"time", :"price", :"currency", :"user_id", 
>> :"quantity", :"amount", :"session_id"): All host(s) tried for query failed 
>> (no host was tried)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:96)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:122)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>         at 
>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at 
>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>  at java.lang.Thread.run(Thread.java:722)
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>         at 
>> com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
>>         at 
>> com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
>>         at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:601)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
>>         at 
>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
>>         at $Proxy17.prepare(Unknown Source)
>>         at 
>> com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:92)
>>         ... 15 more
>> Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All 
>> host(s) tried for query failed (no host was tried)
>>         at 
>> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>>         at 
>> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>>         at 
>> com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepareAsync(AbstractSession.java:103)
>>         at 
>> com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:89)
>>         ... 24 more
>> 
>> Driver stacktrace:
>>         at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>         at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>         at scala.Option.foreach(Option.scala:236)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>         at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>         at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/06/01 16:43:30 WARN TaskSetManager: Lost task 1.0 in stage 61.0 (TID 82, 
>> localhost): org.apache.spark.TaskKilledException
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
>> 
>> A G
>> 
>> 2015-06-01 13:26 GMT+02:00 Helena Edelson <helena.edel...@datastax.com>:
>>> Hi Antonio,
>>>  
>>> First, what version of the Spark Cassandra Connector are you using? You are 
>>> using Spark 1.3.1, which the Cassandra connector today supports in builds 
>>> from the master branch only - the release with public artifacts supporting 
>>> Spark 1.3.1 is coming soon ;)
>>> Please see 
>>> https://github.com/datastax/spark-cassandra-connector#version-compatibility 
>>> Try the version change and LMK.
>>> 
>>> What does your cassandra log say?
>>> 
>>> Note that you can read from a Spark stream like Flume, for instance in your 
>>> flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I 
>>> have not used java)
>>> (here it’s kafka) 
>>> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39
>>> And write inline to Cassandra 
>>> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45
>>>  
>>> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64
>>>  
>>> 
>>> Helena
>>> tw: @helenaedelson
>>> 
>>> 
>>>> On May 29, 2015, at 6:11 AM, Antonio Giambanco <antogia...@gmail.com> 
>>>> wrote:
>>>> 
>>>> Hi all,
>>>> I have in a single server installed spark 1.3.1 and cassandra 2.0.14
>>>> I'm coding a simple java class for Spark Streaming as follow:
>>>> reading header events from flume sink
>>>> based on header I write the event body on navigation or transaction table 
>>>> (cassandra)
>>>> unfortunatly I get NoHostAvailableException, if I comment the code for 
>>>> saving one of the two tables everything works
>>>> 
>>>> 
>>>> here the code
>>>> 
>>>>      public static void main(String[] args) {
>>>>          
>>>>         // Create a local StreamingContext with two working thread and 
>>>> batch interval of 1 second
>>>>          SparkConf conf = new 
>>>> SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp");
>>>>          
>>>>          conf.set("spark.cassandra.connection.host", "127.0.0.1");
>>>>          conf.set("spark.cassandra.connection.native.port","9042");
>>>>          conf.set("spark.cassandra.output.batch.size.rows", "1");
>>>>          conf.set("spark.cassandra.output.concurrent.writes", "1");
>>>>          
>>>>          
>>>>          final JavaStreamingContext jssc = new JavaStreamingContext(conf, 
>>>> Durations.seconds(1));
>>>>          
>>>>          JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = 
>>>> FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888);     
>>>>          
>>>>         
>>>>          JavaDStream<String> logRowsNavig = flumeStreamNavig.map(
>>>>                  new Function<SparkFlumeEvent,String>(){
>>>> 
>>>>                     @Override
>>>>                     public String call(SparkFlumeEvent arg0) throws 
>>>> Exception {
>>>>                         // TODO Auto-generated method stub0.
>>>>                         
>>>>                         Map<CharSequence,CharSequence> headers = 
>>>> arg0.event().getHeaders();
>>>>                                                 
>>>>                         ByteBuffer bytePayload = arg0.event().getBody(); 
>>>>                         String s = headers.get("source_log").toString() + 
>>>> "#" + new String(bytePayload.array());
>>>>                         System.out.println("RIGA: " + s);
>>>>                         return s;
>>>>                     }
>>>>                  });
>>>>          
>>>>         
>>>>          logRowsNavig.foreachRDD(
>>>>                  new Function<JavaRDD<String>,Void>(){
>>>>                     @Override
>>>>                     public Void call(JavaRDD<String> rows) throws 
>>>> Exception {
>>>>                                                 
>>>>                         if(!rows.isEmpty()){
>>>>                              
>>>>                              //String header = 
>>>> getHeaderFronRow(rows.collect());
>>>>                              
>>>>                              List<Navigation> listNavigation = new 
>>>> ArrayList<Navigation>();
>>>>                              List<Transaction> listTransaction = new 
>>>> ArrayList<Transaction>();
>>>>                              
>>>>                              for(String row : rows.collect()){
>>>>                                  
>>>>                                  String header = row.substring(0, 
>>>> row.indexOf("#"));
>>>>                                  
>>>>                                  if(header.contains("controller_log")){
>>>>                                      
>>>> listNavigation.add(createNavigation(row));
>>>>                                      System.out.println("Added Element in 
>>>> Navigation List");
>>>>                                  
>>>>                                  }else if(header.contains("business_log")){
>>>>                                      
>>>> listTransaction.add(createTransaction(row));
>>>>                                      System.out.println("Added Element in 
>>>> Transaction List");
>>>>                                  }     
>>>>                                      
>>>>                              }
>>>>                              
>>>>                     
>>>>                              if(!listNavigation.isEmpty()){
>>>>                                  JavaRDD<Navigation> navigationRows= 
>>>> jssc.sparkContext().parallelize(listNavigation);
>>>>                             
>>>>                                  
>>>> javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", 
>>>> mapToRow(Navigation.class)).saveToCassandra();
>>>>                              }
>>>>                 
>>>>                                 
>>>>                              if(!listTransaction.isEmpty()){
>>>>                                  JavaRDD<Transaction> transactionRows= 
>>>> jssc.sparkContext().parallelize(listTransaction);
>>>>                                     
>>>>                                  
>>>> javaFunctions(transactionRows).writerBuilder("cassandrasink", 
>>>> "transaction", mapToRow(Transaction.class)).saveToCassandra();
>>>>                                  
>>>>                              }
>>>>                             
>>>>                         }
>>>>                         return null;
>>>>                         
>>>>                     }
>>>>              });
>>>>          
>>>>         jssc.start();              // Start the computation
>>>>         jssc.awaitTermination();   // Wait for the computation to 
>>>> terminate 
>>>>      }
>>>> 
>>>> 
>>>> here the exception
>>>> 
>>>> 
>>>> 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute:
>>>> 
>>>> com.datastax.spark.connector.writer.RichBatchStatement@ab76b83
>>>> 
>>>> com.datastax.driver.core.exceptions.NoHostAvailableException: All
>>>> 
>>>> host(s) tried for query failed (no host was tried)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119)
>>>> 
>>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 
>>>>          at
>>>> 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> 
>>>>          at
>>>> 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> 
>>>>          at java.lang.reflect.Method.invoke(Method.java:601)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>>>> 
>>>>          at $Proxy17.executeAsync(Unknown Source)
>>>> 
>>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 
>>>>          at
>>>> 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> 
>>>>          at
>>>> 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> 
>>>>          at java.lang.reflect.Method.invoke(Method.java:601)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
>>>> 
>>>>          at $Proxy17.executeAsync(Unknown Source)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136)
>>>> 
>>>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> 
>>>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>> 
>>>>          at java.lang.Thread.run(Thread.java:722)
>>>> 
>>>> 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 
>>>> 20)
>>>> 
>>>> java.io.IOException: Failed to write statements to 
>>>> cassandrasink.navigation.
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> 
>>>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>> 
>>>>          at java.lang.Thread.run(Thread.java:722)
>>>> 
>>>> 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 
>>>> 20, localhost): java.io.IOException: Failed to write statements to 
>>>> cassandrasink.navigation.
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> 
>>>>          at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 
>>>>          at
>>>> 
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>> 
>>>>          at
>>>> 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>> 
>>>>          at java.lang.Thread.run(Thread.java:722)
>>>> 
>>>> 
>>>> 
>>>> 
>>>> A G
> 

Reply via email to