bq.                                                          return new
Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);

I don't think Put is serializable.

FYI

On Fri, Jun 12, 2015 at 6:40 AM, Vamshi Krishna <vamshi2...@gmail.com>
wrote:

> Hi I am trying to write data that is produced from kafka commandline
> producer for some topic. I am facing problem and unable to proceed. Below
> is my code which I am creating a jar and running through spark-submit on
> spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with
>  SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)   line in below error
> message?
>
>
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");
>                 // Create the context with a 1 second batch size
>                 JavaStreamingContext jsc = new
> JavaStreamingContext(sparkConf, new Duration(5000));
>
>                 int numThreads = 2;
>                 Map<String, Integer> topicMap = new HashMap<String,
> Integer>();
>            //     topicMap.put("viewTopic", numThreads);
>                 topicMap.put("nonview", numThreads);
>
>                 JavaPairReceiverInputDStream<String, String> messages =
>                         KafkaUtils.createStream(jsc, "localhost",
> "ViewConsumer", topicMap);
>
>                 JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String, String>, String>() {
>                     @Override
>                     public String call(Tuple2<String, String> tuple2) {
>                         return tuple2._2();
>                     }
>                 });
>
>                 lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
>                                      @Override
>                                      public Void call(JavaRDD<String>
> stringJavaRDD) throws Exception {
>
>  JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts =
> stringJavaRDD.mapToPair(
>                                                  new PairFunction<String,
> ImmutableBytesWritable, Put>() {
>                                                      @Override
>                                                      public
> Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception {
>
>                                                          Put put = new
> Put(Bytes.toBytes("Rowkey" + Math.random()));
>
>  put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"),
> Bytes.toBytes(line+"fc"));
>                                                          return new
> Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
>                                                      }
>                                                  });
>
>                                          // save to HBase- Spark built-in
> API method
>
>  
> hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
>                                          return null;
>                                      }
>                                  }
>                 );
>                 jsc.start();
>                 jsc.awaitTermination();
>
>
>
>
>
> I see below error on spark-shell.
>
>
> ./bin/spark-submit --class "SparkKafkaDemo" --master local
> /Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
>
> at org.apache.spark.rdd.RDD.map(RDD.scala:286)
>
> at
> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)
>
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)
>
> at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)
>
> at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)
>
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of
> RUNNING
>
> at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
>
> at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)
>
> at java.lang.String.valueOf(String.java:2847)
>
> at java.lang.StringBuilder.append(StringBuilder.java:128)
>
> at scala.StringContext.standardInterpolator(StringContext.scala:122)
>
> at scala.StringContext.s(StringContext.scala:90)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
>
> at
> org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
>
> at
> org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
>
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
>
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>
> ... 24 more
>
>
>
> Thanks
>
>
> -kvk
>

Reply via email to