Hi Antonio, First, what version of the Spark Cassandra Connector are you using? You are using Spark 1.3.1, which the Cassandra connector today supports in builds from the master branch only - the release with public artifacts supporting Spark 1.3.1 is coming soon ;) Please see https://github.com/datastax/spark-cassandra-connector#version-compatibility <https://github.com/datastax/spark-cassandra-connector#version-compatibility> Try the version change and LMK.
What does your cassandra log say? Note that you can read from a Spark stream like Flume, for instance in your flumeStreamNavig.map(..) code (in scala at least, with a lot less code - I have not used java) (here it’s kafka) https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39 <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L39> And write inline to Cassandra https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45 <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L45> https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64 <https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L64> Helena tw: @helenaedelson > On May 29, 2015, at 6:11 AM, Antonio Giambanco <antogia...@gmail.com> wrote: > > Hi all, > I have in a single server installed spark 1.3.1 and cassandra 2.0.14 > I'm coding a simple java class for Spark Streaming as follow: > reading header events from flume sink > based on header I write the event body on navigation or transaction table > (cassandra) > unfortunatly I get NoHostAvailableException, if I comment the code for saving > one of the two tables everything works > > > here the code > > public static void main(String[] args) { > > // Create a local StreamingContext with two working thread and batch > interval of 1 second > SparkConf conf = new > SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp"); > > conf.set("spark.cassandra.connection.host", "127.0.0.1"); > conf.set("spark.cassandra.connection.native.port","9042"); > conf.set("spark.cassandra.output.batch.size.rows", "1"); > conf.set("spark.cassandra.output.concurrent.writes", "1"); > > > final JavaStreamingContext jssc = new JavaStreamingContext(conf, > Durations.seconds(1)); > > JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = > FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888); > > > JavaDStream<String> logRowsNavig = flumeStreamNavig.map( > new Function<SparkFlumeEvent,String>(){ > > @Override > public String call(SparkFlumeEvent arg0) throws Exception > { > // TODO Auto-generated method stub0. > > Map<CharSequence,CharSequence> headers = > arg0.event().getHeaders(); > > ByteBuffer bytePayload = arg0.event().getBody(); > String s = headers.get("source_log").toString() + "#" > + new String(bytePayload.array()); > System.out.println("RIGA: " + s); > return s; > } > }); > > > logRowsNavig.foreachRDD( > new Function<JavaRDD<String>,Void>(){ > @Override > public Void call(JavaRDD<String> rows) throws Exception { > > if(!rows.isEmpty()){ > > //String header = > getHeaderFronRow(rows.collect()); > > List<Navigation> listNavigation = new > ArrayList<Navigation>(); > List<Transaction> listTransaction = new > ArrayList<Transaction>(); > > for(String row : rows.collect()){ > > String header = row.substring(0, > row.indexOf("#")); > > if(header.contains("controller_log")){ > > listNavigation.add(createNavigation(row)); > System.out.println("Added Element in > Navigation List"); > > }else if(header.contains("business_log")){ > > listTransaction.add(createTransaction(row)); > System.out.println("Added Element in > Transaction List"); > } > > } > > > if(!listNavigation.isEmpty()){ > JavaRDD<Navigation> navigationRows= > jssc.sparkContext().parallelize(listNavigation); > > > javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", > mapToRow(Navigation.class)).saveToCassandra(); > } > > > if(!listTransaction.isEmpty()){ > JavaRDD<Transaction> transactionRows= > jssc.sparkContext().parallelize(listTransaction); > > > javaFunctions(transactionRows).writerBuilder("cassandrasink", "transaction", > mapToRow(Transaction.class)).saveToCassandra(); > > } > > } > return null; > > } > }); > > jssc.start(); // Start the computation > jssc.awaitTermination(); // Wait for the computation to terminate > } > > > here the exception > > > 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute: > > com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 > <mailto:com.datastax.spark.connector.writer.RichBatchStatement@ab76b83> > com.datastax.driver.core.exceptions.NoHostAvailableException: All > > host(s) tried for query failed (no host was tried) > > at > > com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) > > at > > com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) > > at > > com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577) > > at > > com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:601) > > at > > com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) > > at $Proxy17.executeAsync(Unknown Source) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:601) > > at > > com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) > > at $Proxy17.executeAsync(Unknown Source) > > at > > com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) > > at > > com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) > > at > > com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at > > com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) > > at > > com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:722) > > 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 20) > > java.io.IOException: Failed to write statements to cassandrasink.navigation. > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) > > at > > com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:722) > > 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20, > localhost): java.io.IOException: Failed to write statements to > cassandrasink.navigation. > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) > > at > > com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) > > at > > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) > > at > > com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) > > at > > com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) > > at > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:722) > > > > > A G >