Hi Marco,

Here is what my code looks like

Config config = new Config("hello");
SparkConf sparkConf = config.buildSparkConfig();
sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class));
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(config.getSparkStremingBatchInterval()));
ssc.sparkContext().setLogLevel("ERROR");


NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test");
JavaReceiverInputDStream<String> jsonMessagesDStream =
ssc.receiverStream(sparkStreamingReceiver);


NSQProducer producer = new NSQProducer()
        .addAddress(config.getServerConfig().getProperty("NSQD_IP"),
Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT")))
        .start();

jsonMessagesDStream
        .mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                JsonParser parser = new JsonParser();
                JsonObject jsonObj = parser.parse(s).getAsJsonObject();
                if (jsonObj != null && jsonObj.has("var1") ) {
                    JsonObject transactionObject =
jsonObj.get("var1").getAsJsonObject();
                    if(transactionObject != null &&
transactionObject.has("var2")) {
                        String key =
transactionObject.get("var2").getAsString();
                        return new Tuple2<>(key, 1);
                    }
                }
                return new Tuple2<>("", 0);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
        }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                @Override
                public void call(JavaPairRDD<String, Integer>
stringIntegerJavaPairRDD) throws Exception {
                    Map<String, Integer> map = new HashMap<>();
                    Gson gson = new Gson();
                    stringIntegerJavaPairRDD
                            .collect()
                            .forEach((Tuple2<String, Integer> KV) -> {
                                String status = KV._1();
                                Integer count = KV._2();
                                map.put(status, count);
                            }
                    );
                    NSQReceiver.send(producer, "output_777",
gson.toJson(map).getBytes());
                }
        });


Thanks,

kant


On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Could you paste reproducible snippet code?
> Kr
>
> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote:
>
>> I have lot of these exceptions happening
>>
>> java.lang.Exception: Could not compute split, block input-0-1480539568000
>> not found
>>
>>
>> Any ideas what this could be?
>>
>

Reply via email to