Re: Python Spark Streaming example with textFileStream does not work. Why?
Can you give absolute paths just to be sure? On Mon, Sep 7, 2015 at 12:59 AM, Kamil Khadievwrote: > I think that problem also depends on file system, > I use mac and My program found file, but only when I created new, but not > rename or move > > And in logs > 15/09/07 10:44:52 INFO FileInputDStream: New files at time 1441611892000 > ms: > I found my file > > But I don't see any processing of file in logs > > 2015-09-07 8:44 GMT+03:00 Kamil Khadiev : > >> Thank you. >> >> But it still does not work. >> >> Also I did another mistake: I wrote name of file, but not directory. >> >> I fix it: >> conf = (SparkConf() >> .setMaster("local") >> .setAppName("My app") >> .set("spark.executor.memory", "1g")) >> sc = SparkContext(conf = conf) >> ssc = StreamingContext(sc, 1) >> lines = ssc.textFileStream('../inputs/streaminginputs') >> counts = lines.flatMap(lambda line: line.split(" "))\ >> .map(lambda x: (x, 1))\ >> .reduceByKey(lambda a, b: a+b) >> counts.pprint() >> ssc.start() >> ssc.awaitTermination() >> >> I add file to '../inputs/streaminginputs' directory, then rename it, >> also try to copy new. >> But it does not help. I have same situation in console. >> Also I have logs like this every second (But I haven't expected logs >> about new file): >> >> --- >> Time: 2015-09-07 08:39:29 >> --- >> >> 15/09/07 08:39:30 INFO FileInputDStream: Finding new files took 0 ms >> 15/09/07 08:39:30 INFO FileInputDStream: New files at time 144160437 >> ms: >> >> 15/09/07 08:39:30 INFO JobScheduler: Added jobs for time 144160437 ms >> 15/09/07 08:39:30 INFO JobScheduler: Starting job streaming job >> 144160437 ms.0 from job set of time 144160437 ms >> 15/09/07 08:39:30 INFO SparkContext: Starting job: runJob at >> PythonRDD.scala:362 >> 15/09/07 08:39:30 INFO DAGScheduler: Registering RDD 163 (call at >> /Library/Python/2.7/site-packages/py4j/java_gateway.py:1206) >> 15/09/07 08:39:30 INFO DAGScheduler: Got job 20 (runJob at >> PythonRDD.scala:362) with 1 output partitions (allowLocal=true) >> 15/09/07 08:39:30 INFO DAGScheduler: Final stage: Stage 41(runJob at >> PythonRDD.scala:362) >> 15/09/07 08:39:30 INFO DAGScheduler: Parents of final stage: List(Stage >> 40) >> 15/09/07 08:39:30 INFO DAGScheduler: Missing parents: List() >> 15/09/07 08:39:30 INFO DAGScheduler: Submitting Stage 41 (PythonRDD[167] >> at RDD at PythonRDD.scala:43), which has no missing parents >> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(5952) called with >> curMem=31088, maxMem=278019440 >> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20 stored as values >> in memory (estimated size 5.8 KB, free 265.1 MB) >> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(4413) called with >> curMem=37040, maxMem=278019440 >> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20_piece0 stored as >> bytes in memory (estimated size 4.3 KB, free 265.1 MB) >> 15/09/07 08:39:30 INFO BlockManagerInfo: Added broadcast_20_piece0 in >> memory on localhost:57739 (size: 4.3 KB, free: 265.1 MB) >> 15/09/07 08:39:30 INFO BlockManagerMaster: Updated info of block >> broadcast_20_piece0 >> 15/09/07 08:39:30 INFO SparkContext: Created broadcast 20 from broadcast >> at DAGScheduler.scala:839 >> 15/09/07 08:39:30 INFO DAGScheduler: Submitting 1 missing tasks from >> Stage 41 (PythonRDD[167] at RDD at PythonRDD.scala:43) >> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Adding task set 41.0 with 1 >> tasks >> 15/09/07 08:39:30 INFO TaskSetManager: Starting task 0.0 in stage 41.0 >> (TID 20, localhost, PROCESS_LOCAL, 1056 bytes) >> 15/09/07 08:39:30 INFO Executor: Running task 0.0 in stage 41.0 (TID 20) >> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >> blocks out of 0 blocks >> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Started 0 remote >> fetches in 0 ms >> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 1, boot = -1005, init = >> 1006, finish = 0 >> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 2, boot = -1004, init = >> 1006, finish = 0 >> 15/09/07 08:39:30 INFO Executor: Finished task 0.0 in stage 41.0 (TID >> 20). 932 bytes result sent to driver >> 15/09/07 08:39:30 INFO TaskSetManager: Finished task 0.0 in stage 41.0 >> (TID 20) in 7 ms on localhost (1/1) >> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Removed TaskSet 41.0, whose >> tasks have all completed, from pool >> 15/09/07 08:39:30 INFO DAGScheduler: Stage 41 (runJob at >> PythonRDD.scala:362) finished in 0.008 s >> 15/09/07 08:39:30 INFO DAGScheduler: Job 20 finished: runJob at >> PythonRDD.scala:362, took 0.015576 s >> 15/09/07 08:39:30 INFO JobScheduler: Finished job streaming job >> 144160437 ms.0 from job set of time 144160437 ms >> >> 2015-09-04 20:14 GMT+03:00 Davies Liu : >> >>> Spark Streaming only process the
Re: Python Spark Streaming example with textFileStream does not work. Why?
I think that problem also depends on file system, I use mac and My program found file, but only when I created new, but not rename or move And in logs 15/09/07 10:44:52 INFO FileInputDStream: New files at time 1441611892000 ms: I found my file But I don't see any processing of file in logs 2015-09-07 8:44 GMT+03:00 Kamil Khadiev: > Thank you. > > But it still does not work. > > Also I did another mistake: I wrote name of file, but not directory. > > I fix it: > conf = (SparkConf() > .setMaster("local") > .setAppName("My app") > .set("spark.executor.memory", "1g")) > sc = SparkContext(conf = conf) > ssc = StreamingContext(sc, 1) > lines = ssc.textFileStream('../inputs/streaminginputs') > counts = lines.flatMap(lambda line: line.split(" "))\ > .map(lambda x: (x, 1))\ > .reduceByKey(lambda a, b: a+b) > counts.pprint() > ssc.start() > ssc.awaitTermination() > > I add file to '../inputs/streaminginputs' directory, then rename it, also > try to copy new. > But it does not help. I have same situation in console. > Also I have logs like this every second (But I haven't expected logs about > new file): > > --- > Time: 2015-09-07 08:39:29 > --- > > 15/09/07 08:39:30 INFO FileInputDStream: Finding new files took 0 ms > 15/09/07 08:39:30 INFO FileInputDStream: New files at time 144160437 > ms: > > 15/09/07 08:39:30 INFO JobScheduler: Added jobs for time 144160437 ms > 15/09/07 08:39:30 INFO JobScheduler: Starting job streaming job > 144160437 ms.0 from job set of time 144160437 ms > 15/09/07 08:39:30 INFO SparkContext: Starting job: runJob at > PythonRDD.scala:362 > 15/09/07 08:39:30 INFO DAGScheduler: Registering RDD 163 (call at > /Library/Python/2.7/site-packages/py4j/java_gateway.py:1206) > 15/09/07 08:39:30 INFO DAGScheduler: Got job 20 (runJob at > PythonRDD.scala:362) with 1 output partitions (allowLocal=true) > 15/09/07 08:39:30 INFO DAGScheduler: Final stage: Stage 41(runJob at > PythonRDD.scala:362) > 15/09/07 08:39:30 INFO DAGScheduler: Parents of final stage: List(Stage 40) > 15/09/07 08:39:30 INFO DAGScheduler: Missing parents: List() > 15/09/07 08:39:30 INFO DAGScheduler: Submitting Stage 41 (PythonRDD[167] > at RDD at PythonRDD.scala:43), which has no missing parents > 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(5952) called with > curMem=31088, maxMem=278019440 > 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20 stored as values in > memory (estimated size 5.8 KB, free 265.1 MB) > 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(4413) called with > curMem=37040, maxMem=278019440 > 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20_piece0 stored as > bytes in memory (estimated size 4.3 KB, free 265.1 MB) > 15/09/07 08:39:30 INFO BlockManagerInfo: Added broadcast_20_piece0 in > memory on localhost:57739 (size: 4.3 KB, free: 265.1 MB) > 15/09/07 08:39:30 INFO BlockManagerMaster: Updated info of block > broadcast_20_piece0 > 15/09/07 08:39:30 INFO SparkContext: Created broadcast 20 from broadcast > at DAGScheduler.scala:839 > 15/09/07 08:39:30 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 41 (PythonRDD[167] at RDD at PythonRDD.scala:43) > 15/09/07 08:39:30 INFO TaskSchedulerImpl: Adding task set 41.0 with 1 tasks > 15/09/07 08:39:30 INFO TaskSetManager: Starting task 0.0 in stage 41.0 > (TID 20, localhost, PROCESS_LOCAL, 1056 bytes) > 15/09/07 08:39:30 INFO Executor: Running task 0.0 in stage 41.0 (TID 20) > 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty > blocks out of 0 blocks > 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Started 0 remote > fetches in 0 ms > 15/09/07 08:39:30 INFO PythonRDD: Times: total = 1, boot = -1005, init = > 1006, finish = 0 > 15/09/07 08:39:30 INFO PythonRDD: Times: total = 2, boot = -1004, init = > 1006, finish = 0 > 15/09/07 08:39:30 INFO Executor: Finished task 0.0 in stage 41.0 (TID 20). > 932 bytes result sent to driver > 15/09/07 08:39:30 INFO TaskSetManager: Finished task 0.0 in stage 41.0 > (TID 20) in 7 ms on localhost (1/1) > 15/09/07 08:39:30 INFO TaskSchedulerImpl: Removed TaskSet 41.0, whose > tasks have all completed, from pool > 15/09/07 08:39:30 INFO DAGScheduler: Stage 41 (runJob at > PythonRDD.scala:362) finished in 0.008 s > 15/09/07 08:39:30 INFO DAGScheduler: Job 20 finished: runJob at > PythonRDD.scala:362, took 0.015576 s > 15/09/07 08:39:30 INFO JobScheduler: Finished job streaming job > 144160437 ms.0 from job set of time 144160437 ms > > 2015-09-04 20:14 GMT+03:00 Davies Liu : > >> Spark Streaming only process the NEW files after it started, so you >> should point it to a directory, and copy the file into it after >> started. >> >> On Fri, Sep 4, 2015 at 5:15 AM, Kamilbek wrote: >> > I use spark 1.3.1 and Python 2.7 >> > >> > It is
Re: Python Spark Streaming example with textFileStream does not work. Why?
Spark Streaming only process the NEW files after it started, so you should point it to a directory, and copy the file into it after started. On Fri, Sep 4, 2015 at 5:15 AM, Kamilbekwrote: > I use spark 1.3.1 and Python 2.7 > > It is my first experience with Spark Streaming. > > I try example of code, which reads data from file using spark streaming. > > This is link to example: > https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py > > My code is the following: > > conf = (SparkConf() > .setMaster("local") > .setAppName("My app") > .set("spark.executor.memory", "1g")) > sc = SparkContext(conf = conf) > ssc = StreamingContext(sc, 1) > lines = ssc.textFileStream('../inputs/2.txt') > counts = lines.flatMap(lambda line: line.split(" "))\ > .map(lambda x: (x, 1))\ > .reduceByKey(lambda a, b: a+b) > counts.pprint() > ssc.start() > ssc.awaitTermination() > > > content of 2.txt file is following: > > a1 b1 c1 d1 e1 f1 g1 > a2 b2 c2 d2 e2 f2 g2 > a3 b3 c3 d3 e3 f3 g3 > > > I expect that something related to file content will be in console, but > there are nothing. Nothing except text like this each second: > > --- > Time: 2015-09-03 15:08:18 > --- > > and Spark's logs. > > Do I do some thing wrong? Otherwise why it does not work? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-Streaming-example-with-textFileStream-does-not-work-Why-tp24579.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org