spark-streaming stopping
I am not able to stop Spark-streaming job. Let me explain briefly * getting data from Kafka topic * splitting data to create a JavaRDD * mapping the JavaRDD to JavaPairRDD to do a reduceByKey transformation * writing the JavaPairRDD into the C* DB // something going wrong here the message in the Kafka topic is exhausted but still the program is running, the staging is happening though there is no data from Kafka, so when I tried to kill the program manually there was no output into the database C*.
[no subject]
I am using spark streaming for a basic streaming movie count program. So I first I have mapped the year and movie name to a JavaPairRDD and I am using the reduceByKey cor counting the movie year wise. I am using cassandra for output, the spark streaming application is not stopping and the cassandra is also not showing any output. I think the data should be staging, If so how do I stop my spark streaming application regards, Sathya
[no subject]
code: directKafkaStream.foreachRDD(rdd -> { rdd.foreach(record -> { messages1.add(record._2); }); JavaRDD lines = sc.parallelize(messages1); JavaPairRDD data = lines.mapToPair(new PairFunction() { @Override public Tuple2 call(String a) { String[] tokens = StringUtil.split(a, '%'); return new Tuple2(Integer.getInteger(tokens[3]),tokens[2]); } }); Function2 reduceSumFunc = (accum, n) -> (accum.concat(n)); JavaPairRDD yearCount = data.reduceByKey(reduceSumFunc); javaFunctions(yearCount).writerBuilder("movie_keyspace","movie_count",mapTupleToRow(Integer.class,String.class)) .withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra();// this is the error line }); -- error: com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year at com.datastax.spark.connector.writer.RoutingKeyGenerator$$ anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49) at scala.collection.immutable.Range.foreach$mVc$sp(Range. scala:160) at com.datastax.spark.connector.writer.RoutingKeyGenerator. fillRoutingKey(RoutingKeyGenerator.scala:47) at com.datastax.spark.connector.writer.RoutingKeyGenerator. apply(RoutingKeyGenerator.scala:56) at com.datastax.spark.connector.writer.TableWriter. batchRoutingKey(TableWriter.scala:126) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.GroupingBatchBuilder. next(GroupingBatchBuilder.scala:107) at com.datastax.spark.connector.writer.GroupingBatchBuilder. next(GroupingBatchBuilder.scala:31) at scala.collection.Iterator$class.foreach(Iterator.scala: 893) at com.datastax.spark.connector.writer.GroupingBatchBuilder. foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$ write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$ anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$ anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector. closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector. withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write( TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$ saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$ saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Trying to connect Kafka and cassandra using spark Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra I have given comment in the line where the error is
[no subject]
code: directKafkaStream.foreachRDD(rdd -> { rdd.foreach(record -> { messages1.add(record._2); }); JavaRDD lines = sc.parallelize(messages1); JavaPairRDD data = lines.mapToPair(new PairFunction() { @Override public Tuple2 call(String a) { String[] tokens = StringUtil.split(a, '%'); return new Tuple2(Integer.getInteger(tokens[3]),tokens[2]); } }); Function2 reduceSumFunc = (accum, n) -> (accum.concat(n)); JavaPairRDD yearCount = data.reduceByKey(reduceSumFunc); javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra(); // this is the error line }); -- error: com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47) at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56) at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Trying to connect Kafka and cassandra using spark Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra I have given comment in the line where the error is Thank you - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
java-lang-noclassdeffounderror-org-apache-spark-streaming-api-java-javastreamin
Error in the highlighted line. Code, error and pom.xml included below code : final Session session = connector.openSession(); final PreparedStatement prepared = session.prepare("INSERT INTO spark_test5.messages JSON?"); JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); Map kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); error : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/api/java/JavaStreamingContext Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.api.java.JavaStreamingContext at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) pom : http://maven.apache.org/POM/4.0.0"; xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 SparkPoc Spark-Poc 0.0.1-SNAPSHOT org.apache.spark spark-streaming_2.11 2.0.0 provided org.apache.spark spark-core_2.11 2.0.1 org.apache.spark spark-streaming-kafka-0-8_2.11 2.0.0 com.datastax.spark spark-cassandra-connector_2.11 2.0.0-M3 org.apache.spark spark-sql_2.11 2.0.1 org.apache.maven.plugins maven-compiler-plugin 3.3 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 2.4.1 jar-with-dependencies com.nwf.Consumer make-assembly package single
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef
Hi , I got the error below when executed Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; error in detail: Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82) at com.nwf.Consumer.main(Consumer.java:63) code : Consumer consumer = new Consumer(); SparkConf conf = new SparkConf().setAppName("kafka-sandbox").setMaster("local[2]"); conf.set("spark.cassandra.connection.host", "localhost"); //connection for cassandra database JavaSparkContext sc = new JavaSparkContext(conf); CassandraConnector connector = CassandraConnector.apply(sc.getConf()); final Session session = connector.openSession(); final PreparedStatement prepared = session.prepare("INSERT INTO spark_test5.messages JSON?"); The error is in the line which is in green color. Thank you guys.
how to integrate Apache Kafka with spark ?
How do I take input from Apache Kafka into Apache Spark Streaming for stream processing ? -sathya