Hi,
I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 
0.9.0 from cloudera. 
I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different 
directory. Every 100 seconds, it is creating a new directory in HDFS with 
_Success(stream-Random/_Success). But, it is not adding any data/output to it. 
I verified that I am adding new files to the correct HDFS directory. Another 
change I have noticed is that, Spark picks up the initial files present in the 
directory used for streaming. The behaviour seems like Batch processing. 
Although, at specified interval, it does create a new folder in HDFS with 
_Success.
So, the major issue is that it is not able to recognize new files created in 
HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", 
Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")
 data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + 
"/user<path/to/file>" + Random.nextInt))
 ssc.start


It is creating these directories with only _Success : 
stream562343230
stream1228731977
stream318151149
stream603511115


This is the error stack I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 
ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, 
use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. 
Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 
ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 
1392626300000 ms (execution: 0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older 
than 1392626200000 ms: 
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 
ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 
ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 
1392626400000 ms (execution: 0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older 
than 1392626300000 ms: 
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 
ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 
ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older 
than 1392626400000 ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 
1392626500000 ms (execution: 0.102 s)


Thanks and Regards,
Suraj Sheth

Reply via email to