Here is another transformation that might cause the error but it has to be
one of these two since I only have two transformations

jsonMessagesDStream
        .window(new Duration(60000), new Duration(1000))
        .mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String s) throws Exception {
                //System.out.println(s + " *****************************");
                JsonParser parser = new JsonParser();
                JsonObject jsonObj = parser.parse(s).getAsJsonObject();

                if (jsonObj != null && jsonObj.has("var1")) {
                    JsonObject jsonObject =
jsonObj.get("var1").getAsJsonObject();
                    if (jsonObject != null && jsonObject.has("var2")
&& jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) {
                        long num = jsonObject.get("var3").getAsLong();

                        return new Tuple2<String, Long>("var3", num);
                    }
                }

                return new Tuple2<String, Long>("var3", 0L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1+v2;
         }
        }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
            @Override
            public void call(JavaPairRDD<String, Long>
stringIntegerJavaPairRDD) throws Exception {
                Map<String, Long> map = new HashMap<>();
                Gson gson = new Gson();
                stringIntegerJavaPairRDD
                    .collect()
                    .forEach((Tuple2<String, Long> KV) -> {
                            String status = KV._1();
                            Long count = KV._2();
                            map.put(status, count);
                        }
                    );
                NSQReceiver.send(producer, "dashboard",
gson.toJson(map).getBytes());
            }
        });


On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Marco,
>
>
> Here is what my code looks like
>
> Config config = new Config("hello");
> SparkConf sparkConf = config.buildSparkConfig();
> sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class));
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
> Duration(config.getSparkStremingBatchInterval()));
> ssc.sparkContext().setLogLevel("ERROR");
>
>
> NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test");
> JavaReceiverInputDStream<String> jsonMessagesDStream = 
> ssc.receiverStream(sparkStreamingReceiver);
>
>
> NSQProducer producer = new NSQProducer()
>         .addAddress(config.getServerConfig().getProperty("NSQD_IP"), 
> Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT")))
>         .start();
>
> jsonMessagesDStream
>         .mapToPair(new PairFunction<String, String, Integer>() {
>             @Override
>             public Tuple2<String, Integer> call(String s) throws Exception {
>                 JsonParser parser = new JsonParser();
>                 JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>                 if (jsonObj != null && jsonObj.has("var1") ) {
>                     JsonObject transactionObject = 
> jsonObj.get("var1").getAsJsonObject();
>                     if(transactionObject != null && 
> transactionObject.has("var2")) {
>                         String key = 
> transactionObject.get("var2").getAsString();
>                         return new Tuple2<>(key, 1);
>                     }
>                 }
>                 return new Tuple2<>("", 0);
>             }
>         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>                 @Override
>                 public Integer call(Integer v1, Integer v2) throws Exception {
>                     return v1+v2;
>                 }
>         }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
>                 @Override
>                 public void call(JavaPairRDD<String, Integer> 
> stringIntegerJavaPairRDD) throws Exception {
>                     Map<String, Integer> map = new HashMap<>();
>                     Gson gson = new Gson();
>                     stringIntegerJavaPairRDD
>                             .collect()
>                             .forEach((Tuple2<String, Integer> KV) -> {
>                                 String status = KV._1();
>                                 Integer count = KV._2();
>                                 map.put(status, count);
>                             }
>                     );
>                     NSQReceiver.send(producer, "output_777", 
> gson.toJson(map).getBytes());
>                 }
>         });
>
>
> Thanks,
>
> kant
>
>
> On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Could you paste reproducible snippet code?
>> Kr
>>
>> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote:
>>
>>> I have lot of these exceptions happening
>>>
>>> java.lang.Exception: Could not compute split, block
>>> input-0-1480539568000 not found
>>>
>>>
>>> Any ideas what this could be?
>>>
>>
>

Reply via email to