Hi, I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera. I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory. Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success). But, it is not adding any data/output to it. I verified that I am adding new files to the correct HDFS directory. Another change I have noticed is that, Spark picks up the initial files present in the directory used for streaming. The behaviour seems like Batch processing. Although, at specified interval, it does create a new folder in HDFS with _Success. So, the major issue is that it is not able to recognize new files created in HDFS.
Code used : val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars) val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data") data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user<path/to/file>" + Random.nextInt)) ssc.start It is creating these directories with only _Success : stream562343230 stream1228731977 stream318151149 stream603511115 This is the error stack I get : 14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms 14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms: 14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms 14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms 14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 0.001934866 s 14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms 14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution: 0.167 s) 14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000 ms: 14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms 14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms: 14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms 14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms 14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.9016E-5 s 14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms 14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution: 0.077 s) 14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000 ms: 14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms 14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms: 14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms 14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms 14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.8111E-5 s 14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms 14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000 ms: 1392626300000 ms 14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution: 0.102 s) Thanks and Regards, Suraj Sheth