Check that you get the data from kafka producer

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

                        @Override
                        public Void call(JavaRDD<String> rdd) throws 
Exception {
                                List<String> collect = rdd.collect();
                                for (String data : collect) {
                                        try {
                                                // save data in the log.txt 
file
                                                Path filePath = Paths
                                                                .get(rdd 
save file);
                                                if (!Files.exists(filePath)) 
{
                                                        
Files.createFile(filePath);
                                                }
                                                String temp = "Text to be 
added" + " data is " + data;
                                                Files.write(filePath, 
temp.getBytes(),
                                                                
StandardOpenOption.APPEND);
                                        } catch (IOException e) {
                                                e.printStackTrace();
                                        }
                                }
                                return null;
                        }
                });

Reply via email to