Hi Marco,
Here is what my code looks like Config config = new Config("hello"); SparkConf sparkConf = config.buildSparkConfig(); sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class)); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(config.getSparkStremingBatchInterval())); ssc.sparkContext().setLogLevel("ERROR"); NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test"); JavaReceiverInputDStream<String> jsonMessagesDStream = ssc.receiverStream(sparkStreamingReceiver); NSQProducer producer = new NSQProducer() .addAddress(config.getServerConfig().getProperty("NSQD_IP"), Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT"))) .start(); jsonMessagesDStream .mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { JsonParser parser = new JsonParser(); JsonObject jsonObj = parser.parse(s).getAsJsonObject(); if (jsonObj != null && jsonObj.has("var1") ) { JsonObject transactionObject = jsonObj.get("var1").getAsJsonObject(); if(transactionObject != null && transactionObject.has("var2")) { String key = transactionObject.get("var2").getAsString(); return new Tuple2<>(key, 1); } } return new Tuple2<>("", 0); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { @Override public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { Map<String, Integer> map = new HashMap<>(); Gson gson = new Gson(); stringIntegerJavaPairRDD .collect() .forEach((Tuple2<String, Integer> KV) -> { String status = KV._1(); Integer count = KV._2(); map.put(status, count); } ); NSQReceiver.send(producer, "output_777", gson.toJson(map).getBytes()); } }); Thanks, kant On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Could you paste reproducible snippet code? > Kr > > On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote: > >> I have lot of these exceptions happening >> >> java.lang.Exception: Could not compute split, block input-0-1480539568000 >> not found >> >> >> Any ideas what this could be? >> >