Re: Get filename in Spark Streaming

2015-02-24 Thread Emre Sevinc
Hello Subacini,

Until someone more knowledgeable suggests a better, more straightforward,
and simpler approach with a working code snippet, I suggest the following
workaround / hack:

 inputStream.foreachRDD(rdd =
  val myStr = rdd.toDebugString
  // process myStr string value, e.g. using regular expressions
)

For example if you print myStr, you can see in your log / consol output
somehing similar to:

15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1
15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
(20) MappedRDD[27] at textFileStream at kmeans.scala:17 []
 |   UnionRDD[26] at textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at
textFileStream at kmeans.scala:17 []
15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time
1424787295000 ms (execution: 0.051 s)
15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list
15/02/24 15:14:56 INFO BlockManager: Removing RDD 5
15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were
older than 1424787235000 ms:
15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

You can process the string to retrieve each section that starts with file:
and ends with a space. Then for each such string you can get your timestamp
from the file name.

--
Emre Sevinç
http://www.bigindustries.be/





On Fri, Feb 6, 2015 at 9:33 PM, Subacini B subac...@gmail.com wrote:

 Thank you Emre, This helps, i am able to get filename.

 But i am not sure how to fit this into Dstream RDD.

 val inputStream = ssc.textFileStream(/hdfs Path/)

 inputStream is Dstreamrdd and in foreachrdd , am doing my processing

  inputStream.foreachRDD(rdd = {
* //how to get filename here??*
 })


 Can you please help.


 On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 Did you check the following?


 http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

 http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html

 --
 Emre Sevinç


 On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:

 Hi All,

 We have filename with timestamp say ABC_1421893256000.txt and the
 timestamp  needs to be extracted from file name for further processing.Is
 there a way to get input file name  picked up by spark streaming job?

 Thanks in advance

 Subacini




 --
 Emre Sevinc





-- 
Emre Sevinc


Re: Get filename in Spark Streaming

2015-02-06 Thread Subacini B
Thank you Emre, This helps, i am able to get filename.

But i am not sure how to fit this into Dstream RDD.

val inputStream = ssc.textFileStream(/hdfs Path/)

inputStream is Dstreamrdd and in foreachrdd , am doing my processing

 inputStream.foreachRDD(rdd = {
   * //how to get filename here??*
})


Can you please help.


On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 Did you check the following?


 http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

 http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html

 --
 Emre Sevinç


 On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:

 Hi All,

 We have filename with timestamp say ABC_1421893256000.txt and the
 timestamp  needs to be extracted from file name for further processing.Is
 there a way to get input file name  picked up by spark streaming job?

 Thanks in advance

 Subacini




 --
 Emre Sevinc



Re: Get filename in Spark Streaming

2015-02-05 Thread Emre Sevinc
Hello,

Did you check the following?


http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html

--
Emre Sevinç


On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:

 Hi All,

 We have filename with timestamp say ABC_1421893256000.txt and the
 timestamp  needs to be extracted from file name for further processing.Is
 there a way to get input file name  picked up by spark streaming job?

 Thanks in advance

 Subacini




-- 
Emre Sevinc


Get filename in Spark Streaming

2015-02-05 Thread Subacini B
Hi All,

We have filename with timestamp say ABC_1421893256000.txt and the
timestamp  needs to be extracted from file name for further processing.Is
there a way to get input file name  picked up by spark streaming job?

Thanks in advance

Subacini