Kant, We need to narrow it down to a reproducible code. You are using streaming What is the content of ur streamed data. If u provide that I can run a streaming programming that reads from a local dir and narrow down the problem I have seen similar error for doing something completely different. As u say there might be problem with ur transformation coming from the structure of the data. Send me a sample of the data you are streaming and I write a small test case....kr
On 1 Dec 2016 9:44 am, "kant kodali" <kanth...@gmail.com> wrote: > sorry for multiple emails. I just think more info is needed every time to > address this problem > > My Spark Client program runs in a client mode and it runs on a node that > has 2 vCPU's and 8GB RAM (m4.large) > I have 2 Spark worker nodes and each have 4 vCPU's and 16GB RAM > (m3.xlarge for each spark worker instance) > > > > On Thu, Dec 1, 2016 at 12:55 AM, kant kodali <kanth...@gmail.com> wrote: > >> My batch interval is 1s >> slide interval is 1s >> window interval is 1 minute >> >> I am using a standalone alone cluster. I don't have any storage layer >> like HDFS. so I dont know what is a connection between RDD and blocks (I >> know that for every batch one RDD is produced)? what is a block in this >> context? is it a disk block ? if so, what is it default size? and Finally, >> why does the following error happens so often? >> >> java.lang.Exception: Could not compute split, block input-0-1480539568000 >> not found >> >> >> >> On Thu, Dec 1, 2016 at 12:42 AM, kant kodali <kanth...@gmail.com> wrote: >> >>> I also use this super(StorageLevel.MEMORY_AND_DISK_2()); >>> >>> inside my receiver >>> >>> On Wed, Nov 30, 2016 at 10:44 PM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> Here is another transformation that might cause the error but it has to >>>> be one of these two since I only have two transformations >>>> >>>> jsonMessagesDStream >>>> .window(new Duration(60000), new Duration(1000)) >>>> .mapToPair(new PairFunction<String, String, Long>() { >>>> @Override >>>> public Tuple2<String, Long> call(String s) throws Exception { >>>> //System.out.println(s + " *****************************"); >>>> JsonParser parser = new JsonParser(); >>>> JsonObject jsonObj = parser.parse(s).getAsJsonObject(); >>>> >>>> if (jsonObj != null && jsonObj.has("var1")) { >>>> JsonObject jsonObject = >>>> jsonObj.get("var1").getAsJsonObject(); >>>> if (jsonObject != null && jsonObject.has("var2") && >>>> jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) { >>>> long num = jsonObject.get("var3").getAsLong(); >>>> >>>> return new Tuple2<String, Long>("var3", num); >>>> } >>>> } >>>> >>>> return new Tuple2<String, Long>("var3", 0L); >>>> } >>>> }).reduceByKey(new Function2<Long, Long, Long>() { >>>> @Override >>>> public Long call(Long v1, Long v2) throws Exception { >>>> return v1+v2; >>>> } >>>> }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { >>>> @Override >>>> public void call(JavaPairRDD<String, Long> >>>> stringIntegerJavaPairRDD) throws Exception { >>>> Map<String, Long> map = new HashMap<>(); >>>> Gson gson = new Gson(); >>>> stringIntegerJavaPairRDD >>>> .collect() >>>> .forEach((Tuple2<String, Long> KV) -> { >>>> String status = KV._1(); >>>> Long count = KV._2(); >>>> map.put(status, count); >>>> } >>>> ); >>>> NSQReceiver.send(producer, "dashboard", >>>> gson.toJson(map).getBytes()); >>>> } >>>> }); >>>> >>>> >>>> On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi Marco, >>>>> >>>>> >>>>> Here is what my code looks like >>>>> >>>>> Config config = new Config("hello"); >>>>> SparkConf sparkConf = config.buildSparkConfig(); >>>>> sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class)); >>>>> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new >>>>> Duration(config.getSparkStremingBatchInterval())); >>>>> ssc.sparkContext().setLogLevel("ERROR"); >>>>> >>>>> >>>>> NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, >>>>> "input_test"); >>>>> JavaReceiverInputDStream<String> jsonMessagesDStream = >>>>> ssc.receiverStream(sparkStreamingReceiver); >>>>> >>>>> >>>>> NSQProducer producer = new NSQProducer() >>>>> .addAddress(config.getServerConfig().getProperty("NSQD_IP"), >>>>> Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT"))) >>>>> .start(); >>>>> >>>>> jsonMessagesDStream >>>>> .mapToPair(new PairFunction<String, String, Integer>() { >>>>> @Override >>>>> public Tuple2<String, Integer> call(String s) throws >>>>> Exception { >>>>> JsonParser parser = new JsonParser(); >>>>> JsonObject jsonObj = parser.parse(s).getAsJsonObject(); >>>>> if (jsonObj != null && jsonObj.has("var1") ) { >>>>> JsonObject transactionObject = >>>>> jsonObj.get("var1").getAsJsonObject(); >>>>> if(transactionObject != null && >>>>> transactionObject.has("var2")) { >>>>> String key = >>>>> transactionObject.get("var2").getAsString(); >>>>> return new Tuple2<>(key, 1); >>>>> } >>>>> } >>>>> return new Tuple2<>("", 0); >>>>> } >>>>> }).reduceByKey(new Function2<Integer, Integer, Integer>() { >>>>> @Override >>>>> public Integer call(Integer v1, Integer v2) throws >>>>> Exception { >>>>> return v1+v2; >>>>> } >>>>> }).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { >>>>> @Override >>>>> public void call(JavaPairRDD<String, Integer> >>>>> stringIntegerJavaPairRDD) throws Exception { >>>>> Map<String, Integer> map = new HashMap<>(); >>>>> Gson gson = new Gson(); >>>>> stringIntegerJavaPairRDD >>>>> .collect() >>>>> .forEach((Tuple2<String, Integer> KV) -> { >>>>> String status = KV._1(); >>>>> Integer count = KV._2(); >>>>> map.put(status, count); >>>>> } >>>>> ); >>>>> NSQReceiver.send(producer, "output_777", >>>>> gson.toJson(map).getBytes()); >>>>> } >>>>> }); >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> kant >>>>> >>>>> >>>>> On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistr...@gmail.com> >>>>> wrote: >>>>> >>>>>> Could you paste reproducible snippet code? >>>>>> Kr >>>>>> >>>>>> On 30 Nov 2016 9:08 pm, "kant kodali" <kanth...@gmail.com> wrote: >>>>>> >>>>>>> I have lot of these exceptions happening >>>>>>> >>>>>>> java.lang.Exception: Could not compute split, block >>>>>>> input-0-1480539568000 not found >>>>>>> >>>>>>> >>>>>>> Any ideas what this could be? >>>>>>> >>>>>> >>>>> >>>> >>> >> >