Can you give absolute paths just to be sure?

On Mon, Sep 7, 2015 at 12:59 AM, Kamil Khadiev <kamilh...@gmail.com> wrote:

> I think that problem also depends on file system,
> I use mac and My program found file, but only when I created new, but not
> rename or move
>
> And in logs
> 15/09/07 10:44:52 INFO FileInputDStream: New files at time 1441611892000
> ms:
> I found my file
>
> But I don't see any processing of file in logs
>
> 2015-09-07 8:44 GMT+03:00 Kamil Khadiev <kamilh...@gmail.com>:
>
>> Thank you.
>>
>> But it still does not work.
>>
>> Also I did another mistake: I wrote name of file, but not directory.
>>
>> I fix it:
>>   conf = (SparkConf()
>>          .setMaster("local")
>>          .setAppName("My app")
>>          .set("spark.executor.memory", "1g"))
>>     sc = SparkContext(conf = conf)
>>     ssc = StreamingContext(sc, 1)
>>     lines = ssc.textFileStream('../inputs/streaminginputs')
>>     counts = lines.flatMap(lambda line: line.split(" "))\
>>               .map(lambda x: (x, 1))\
>>               .reduceByKey(lambda a, b: a+b)
>>     counts.pprint()
>>     ssc.start()
>>     ssc.awaitTermination()
>>
>> I add file to '../inputs/streaminginputs' directory, then rename it,
>> also try to copy new.
>> But it does not help.  I have same situation in console.
>> Also I have logs like this every second (But I haven't expected logs
>> about new file):
>>
>> -------------------------------------------
>> Time: 2015-09-07 08:39:29
>> -------------------------------------------
>>
>> 15/09/07 08:39:30 INFO FileInputDStream: Finding new files took 0 ms
>> 15/09/07 08:39:30 INFO FileInputDStream: New files at time 1441604370000
>> ms:
>>
>> 15/09/07 08:39:30 INFO JobScheduler: Added jobs for time 1441604370000 ms
>> 15/09/07 08:39:30 INFO JobScheduler: Starting job streaming job
>> 1441604370000 ms.0 from job set of time 1441604370000 ms
>> 15/09/07 08:39:30 INFO SparkContext: Starting job: runJob at
>> PythonRDD.scala:362
>> 15/09/07 08:39:30 INFO DAGScheduler: Registering RDD 163 (call at
>> /Library/Python/2.7/site-packages/py4j/java_gateway.py:1206)
>> 15/09/07 08:39:30 INFO DAGScheduler: Got job 20 (runJob at
>> PythonRDD.scala:362) with 1 output partitions (allowLocal=true)
>> 15/09/07 08:39:30 INFO DAGScheduler: Final stage: Stage 41(runJob at
>> PythonRDD.scala:362)
>> 15/09/07 08:39:30 INFO DAGScheduler: Parents of final stage: List(Stage
>> 40)
>> 15/09/07 08:39:30 INFO DAGScheduler: Missing parents: List()
>> 15/09/07 08:39:30 INFO DAGScheduler: Submitting Stage 41 (PythonRDD[167]
>> at RDD at PythonRDD.scala:43), which has no missing parents
>> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(5952) called with
>> curMem=31088, maxMem=278019440
>> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20 stored as values
>> in memory (estimated size 5.8 KB, free 265.1 MB)
>> 15/09/07 08:39:30 INFO MemoryStore: ensureFreeSpace(4413) called with
>> curMem=37040, maxMem=278019440
>> 15/09/07 08:39:30 INFO MemoryStore: Block broadcast_20_piece0 stored as
>> bytes in memory (estimated size 4.3 KB, free 265.1 MB)
>> 15/09/07 08:39:30 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>> memory on localhost:57739 (size: 4.3 KB, free: 265.1 MB)
>> 15/09/07 08:39:30 INFO BlockManagerMaster: Updated info of block
>> broadcast_20_piece0
>> 15/09/07 08:39:30 INFO SparkContext: Created broadcast 20 from broadcast
>> at DAGScheduler.scala:839
>> 15/09/07 08:39:30 INFO DAGScheduler: Submitting 1 missing tasks from
>> Stage 41 (PythonRDD[167] at RDD at PythonRDD.scala:43)
>> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Adding task set 41.0 with 1
>> tasks
>> 15/09/07 08:39:30 INFO TaskSetManager: Starting task 0.0 in stage 41.0
>> (TID 20, localhost, PROCESS_LOCAL, 1056 bytes)
>> 15/09/07 08:39:30 INFO Executor: Running task 0.0 in stage 41.0 (TID 20)
>> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty
>> blocks out of 0 blocks
>> 15/09/07 08:39:30 INFO ShuffleBlockFetcherIterator: Started 0 remote
>> fetches in 0 ms
>> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 1, boot = -1005, init =
>> 1006, finish = 0
>> 15/09/07 08:39:30 INFO PythonRDD: Times: total = 2, boot = -1004, init =
>> 1006, finish = 0
>> 15/09/07 08:39:30 INFO Executor: Finished task 0.0 in stage 41.0 (TID
>> 20). 932 bytes result sent to driver
>> 15/09/07 08:39:30 INFO TaskSetManager: Finished task 0.0 in stage 41.0
>> (TID 20) in 7 ms on localhost (1/1)
>> 15/09/07 08:39:30 INFO TaskSchedulerImpl: Removed TaskSet 41.0, whose
>> tasks have all completed, from pool
>> 15/09/07 08:39:30 INFO DAGScheduler: Stage 41 (runJob at
>> PythonRDD.scala:362) finished in 0.008 s
>> 15/09/07 08:39:30 INFO DAGScheduler: Job 20 finished: runJob at
>> PythonRDD.scala:362, took 0.015576 s
>> 15/09/07 08:39:30 INFO JobScheduler: Finished job streaming job
>> 1441604370000 ms.0 from job set of time 1441604370000 ms
>>
>> 2015-09-04 20:14 GMT+03:00 Davies Liu <dav...@databricks.com>:
>>
>>> Spark Streaming only process the NEW files after it started, so you
>>> should point it to a directory, and copy the file into it after
>>> started.
>>>
>>> On Fri, Sep 4, 2015 at 5:15 AM, Kamilbek <kamilh...@gmail.com> wrote:
>>> > I use spark 1.3.1 and Python 2.7
>>> >
>>> > It is my first experience with Spark Streaming.
>>> >
>>> > I try example of code, which reads data from file using spark
>>> streaming.
>>> >
>>> > This is link to example:
>>> >
>>> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
>>> >
>>> > My code is the following:
>>> >
>>> >     conf = (SparkConf()
>>> >          .setMaster("local")
>>> >          .setAppName("My app")
>>> >          .set("spark.executor.memory", "1g"))
>>> >     sc = SparkContext(conf = conf)
>>> >     ssc = StreamingContext(sc, 1)
>>> >     lines = ssc.textFileStream('../inputs/2.txt')
>>> >     counts = lines.flatMap(lambda line: line.split(" "))\
>>> >               .map(lambda x: (x, 1))\
>>> >               .reduceByKey(lambda a, b: a+b)
>>> >     counts.pprint()
>>> >     ssc.start()
>>> >     ssc.awaitTermination()
>>> >
>>> >
>>> > content of 2.txt file is following:
>>> >
>>> > a1 b1 c1 d1 e1 f1 g1
>>> > a2 b2 c2 d2 e2 f2 g2
>>> > a3 b3 c3 d3 e3 f3 g3
>>> >
>>> >
>>> > I expect that something related to file content will be in console, but
>>> > there are nothing. Nothing except text like this each second:
>>> >
>>> > -------------------------------------------
>>> > Time: 2015-09-03 15:08:18
>>> > -------------------------------------------
>>> >
>>> > and Spark's logs.
>>> >
>>> > Do I do some thing wrong? Otherwise why it does not work?
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-Streaming-example-with-textFileStream-does-not-work-Why-tp24579.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>

Reply via email to