I also use this super(StorageLevel.MEMORY_AND_DISK_2()); inside my receiver
On Wed, Nov 30, 2016 at 10:44 PM, kant kodali <kanth...@gmail.com> wrote: > Here is another transformation that might cause the error but it has to be > one of these two since I only have two transformations > > jsonMessagesDStream > .window(new Duration(60000), new Duration(1000)) > .mapToPair(new PairFunction<String, String, Long>() { > @Override > public Tuple2<String, Long> call(String s) throws Exception { > //System.out.println(s + " *****************************"); > JsonParser parser = new JsonParser(); > JsonObject jsonObj = parser.parse(s).getAsJsonObject(); > > if (jsonObj != null && jsonObj.has("var1")) { > JsonObject jsonObject = > jsonObj.get("var1").getAsJsonObject(); > if (jsonObject != null && jsonObject.has("var2") && > jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) { > long num = jsonObject.get("var3").getAsLong(); > > return new Tuple2<String, Long>("var3", num); > } > } > > return new Tuple2<String, Long>("var3", 0L); > } > }).reduceByKey(new Function2<Long, Long, Long>() { > @Override > public Long call(Long v1, Long v2) throws Exception { > return v1+v2; > } > }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { > @Override > public void call(JavaPairRDD<String, Long> > stringIntegerJavaPairRDD) throws Exception { > Map<String, Long> map = new HashMap<>(); > Gson gson = new Gson(); > stringIntegerJavaPairRDD > .collect() > .forEach((Tuple2<String, Long> KV) -> { > String status = KV._1(); > Long count = KV._2(); > map.put(status, count); > } > ); > NSQReceiver.send(producer, "dashboard", > gson.toJson(map).getBytes()); > } > }); > > > On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi Marco, >> >> >> Here is what my code looks like >> >> Config config = new Config("hello"); >> SparkConf sparkConf = config.buildSparkConfig(); >> sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class)); >> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new >> Duration(config.getSparkStremingBatchInterval())); >> ssc.sparkContext().setLogLevel("ERROR"); >> >> >> NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test"); >> JavaReceiverInputDStream<String> jsonMessagesDStream = >> ssc.receiverStream(sparkStreamingReceiver); >> >> >> NSQProducer producer = new NSQProducer() >> .addAddress(config.getServerConfig().getProperty("NSQD_IP"), >> Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT"))) >> .start(); >> >> jsonMessagesDStream >> .mapToPair(new PairFunction<String, String, Integer>() { >> @Override >> public Tuple2<String, Integer> call(String s) throws Exception { >> JsonParser parser = new JsonParser(); >> JsonObject jsonObj = parser.parse(s).getAsJsonObject(); >> if (jsonObj != null && jsonObj.has("var1") ) { >> JsonObject transactionObject = >> jsonObj.get("var1").getAsJsonObject(); >> if(transactionObject != null && >> transactionObject.has("var2")) { >> String key = >> transactionObject.get("var2").getAsString(); >> return new Tuple2<>(key, 1); >> } >> } >> return new Tuple2<>("", 0); >> } >> }).reduceByKey(new Function2<Integer, Integer, Integer>() { >> @Override >> public Integer call(Integer v1, Integer v2) throws Exception >> { >> return v1+v2; >> } >> }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { >> @Override >> public void call(JavaPairRDD<String, Integer> >> stringIntegerJavaPairRDD) throws Exception { >> Map<String, Integer> map = new HashMap<>(); >> Gson gson = new Gson(); >> stringIntegerJavaPairRDD >> .collect() >> .forEach((Tuple2<String, Integer> KV) -> { >> String status = KV._1(); >> Integer count = KV._2(); >> map.put(status, count); >> } >> ); >> NSQReceiver.send(producer, "output_777", >> gson.toJson(map).getBytes()); >> } >> }); >> >> >> Thanks, >> >> kant >> >> >> On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com> >> wrote: >> >>> Could you paste reproducible snippet code? >>> Kr >>> >>> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote: >>> >>>> I have lot of these exceptions happening >>>> >>>> java.lang.Exception: Could not compute split, block >>>> input-0-1480539568000 not found >>>> >>>> >>>> Any ideas what this could be? >>>> >>> >> >