Hi I am trying to write data that is produced from kafka commandline producer for some topic. I am facing problem and unable to proceed. Below is my code which I am creating a jar and running through spark-submit on spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) line in below error message?
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4"); // Create the context with a 1 second batch size JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(5000)); int numThreads = 2; Map<String, Integer> topicMap = new HashMap<String, Integer>(); // topicMap.put("viewTopic", numThreads); topicMap.put("nonview", numThreads); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jsc, "localhost", "ViewConsumer", topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); lines.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> stringJavaRDD) throws Exception { JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = stringJavaRDD.mapToPair( new PairFunction<String, ImmutableBytesWritable, Put>() { @Override public Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception { Put put = new Put(Bytes.toBytes("Rowkey" + Math.random())); put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"), Bytes.toBytes(line+"fc")); return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); } }); // save to HBase- Spark built-in API method hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); return null; } } ); jsc.start(); jsc.awaitTermination(); I see below error on spark-shell. ./bin/spark-submit --class "SparkKafkaDemo" --master local /Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.map(RDD.scala:286) at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113) at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46) at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) at org.apache.hadoop.mapreduce.Job.toString(Job.java:452) at java.lang.String.valueOf(String.java:2847) at java.lang.StringBuilder.append(StringBuilder.java:128) at scala.StringContext.standardInterpolator(StringContext.scala:122) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 24 more Thanks -kvk