Hello Subacini,

Until someone more knowledgeable suggests a better, more straightforward,
and simpler approach with a working code snippet, I suggest the following
workaround / hack:

 inputStream.foreachRDD(rdd =>
      val myStr = rdd.toDebugString
      // process myStr string value, e.g. using regular expressions
    )

For example if you print myStr, you can see in your log / consol output
somehing similar to:

15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1
15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
(20) MappedRDD[27] at textFileStream at kmeans.scala:17 []
 |   UnionRDD[26] at textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at
textFileStream at kmeans.scala:17 []
15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time
1424787295000 ms (execution: 0.051 s)
15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list
15/02/24 15:14:56 INFO BlockManager: Removing RDD 5
15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were
older than 1424787235000 ms:
15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

You can process the string to retrieve each section that starts with file:
and ends with a space. Then for each such string you can get your timestamp
from the file name.

--
Emre Sevinç
http://www.bigindustries.be/





On Fri, Feb 6, 2015 at 9:33 PM, Subacini B <subac...@gmail.com> wrote:

> Thank you Emre, This helps, i am able to get filename.
>
> But i am not sure how to fit this into Dstream RDD.
>
> val inputStream = ssc.textFileStream("/hdfs Path/")
>
> inputStream is Dstreamrdd and in foreachrdd , am doing my processing
>
>  inputStream.foreachRDD(rdd => {
>    * //how to get filename here??*
> })
>
>
> Can you please help.
>
>
> On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Did you check the following?
>>
>>
>> http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html
>>
>> --
>> Emre Sevinç
>>
>>
>> On Fri, Feb 6, 2015 at 2:16 AM, Subacini B <subac...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We have filename with timestamp say ABC_1421893256000.txt and the
>>> timestamp  needs to be extracted from file name for further processing.Is
>>> there a way to get input file name  picked up by spark streaming job?
>>>
>>> Thanks in advance
>>>
>>> Subacini
>>>
>>
>>
>>
>> --
>> Emre Sevinc
>>
>
>


-- 
Emre Sevinc

Reply via email to