Instead of print you should do jsonIn.count().print(). Straight forward
approach is to use foreachRDD :)

Thanks
Best Regards

On Mon, Feb 16, 2015 at 6:48 PM, Emre Sevinc <emre.sev...@gmail.com> wrote:

> Hello Sean,
>
> I did not understand your question very well, but what I do is checking
> the output directory (and I have various logger outputs at various stages
> showing the contents of an input file being processed, the response from
> the web service, etc.).
>
> By the way, I've already solved my problem by using foreachRDD instead of
> print (see my second message in this thread). Apparently forcing Spark to
> materialize DAG via print() is not the way to go. (My interpretation might
> be wrong, but this is what I've just seen in my case).
>
> --
> Emre
>
>
>
>
> On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> How are you deciding whether files are processed or not? It doesn't seem
>> possible from this code. Maybe it just seems so.
>> On Feb 16, 2015 12:51 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote:
>>
>>> I've managed to solve this, but I still don't know exactly why my
>>> solution works:
>>>
>>> In my code I was trying to force the Spark to output via:
>>>
>>>   jsonIn.print();
>>>
>>> jsonIn being a JavaDStream<String>.
>>>
>>> When removed the code above, and added the code below to force the
>>> output operation, hence the execution:
>>>
>>>     jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() {
>>>       @Override
>>>       public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
>>>         stringJavaRDD.collect();
>>>         return null;
>>>       }
>>>     });
>>>
>>> It works as I expect, processing all of the 20 files I give to it,
>>> instead of stopping at 16.
>>>
>>> --
>>> Emre
>>>
>>>
>>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>>>> following manner:
>>>>
>>>>  - Listen to the input directory.
>>>>  - If a new file is copied to that input directory process it.
>>>>  - Process: contact a RESTful web service (running also locally and
>>>> responsive), send the contents of the file, receive the response from the
>>>> web service, write the results as a new file into the output directory
>>>>  - batch interval : 30 seconds
>>>>  - checkpoint interval: 150 seconds
>>>>
>>>> When I test the application locally with 1 or 2 files, it works
>>>> perfectly fine as expected. I run it like:
>>>>
>>>>         spark-submit --class myClass --verbose --master local[4]
>>>> --deploy-mode client myApp.jar /in file:///out
>>>>
>>>> But then I've realized something strange when I copied 20 files to the
>>>> INPUT directory: Spark Streaming detects all of the files, but it ends up
>>>> processing *only 16 files*. And the remaining 4 are not processed at all.
>>>>
>>>> I've tried it with 19, 18, and then 17 files. Same result, only 16
>>>> files end up in the output directory.
>>>>
>>>> Then I've tried it by copying 16 files at once to the input directory,
>>>> and it can process all of the 16 files. That's why I call it magic number
>>>> 16.
>>>>
>>>> When I mean it detects all of the files, I mean that in the logs I see
>>>> the following lines when I copy 17 files:
>>>>
>>>>
>>>> ===============================================================================================================================
>>>> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
>>>> "1G"
>>>> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
>>>> resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
>>>> interface eth0)
>>>> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
>>>> bind to another address
>>>> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
>>>> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
>>>> native-hadoop library for your platform... using builtin-java classes where
>>>> applicable
>>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>>>> file:/tmp/receivedBlockMetadata
>>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>>>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>>>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>>>> -------------------------------------------
>>>> Time: 1424086260000 ms
>>>> -------------------------------------------
>>>>
>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085960000
>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085960000
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>> process : 1
>>>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085990000:
>>>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>> file:/tmp/receivedBlockMetadata older than 1424085990000
>>>>
>>>> -------------------------------------------
>>>>
>>>> Time: 1424086290000 ms
>>>> -------------------------------------------
>>>>
>>>> ===============================================================================================================================
>>>>
>>>> In other words it says "Total input paths to process :1" for 17 times.
>>>> And when I copy 20 files, it says that 20 times.
>>>>
>>>> But it always ends up processing 16 files at once and the remaining
>>>> ones are not processed at all.
>>>>
>>>> However, if I first copy 16 files to the input directory, wait for
>>>> Spark Streaming application to process them (by checking the output
>>>> directory and seeing that 16 files have been created properly), and then
>>>> copy the 4 more files, those 4 files are also processed!
>>>>
>>>> So now I'm in a weird situation that I have to copy 16 files at maximum
>>>> at once, wait them to be processed, and only after that copy again 16 files
>>>> at max, ... otherwise I lose the extra files, in the sense that they are
>>>> not processed. This is not acceptable in my use-case.
>>>>
>>>> I've also checked the parameter
>>>>
>>>>    spark.streaming.receiver.maxRate
>>>>
>>>>
>>>> and it is INFINITE by default, I've tried setting it to 10 for example,
>>>> and nothing has changed.
>>>>
>>>> Any ideas what might be causing this situation, having a magic number
>>>> of 16 files at once?
>>>>
>>>>
>>>> --
>>>> Emre Sevinç
>>>>
>>>
>>>
>>>
>>> --
>>> Emre Sevinc
>>>
>>
>
>
> --
> Emre Sevinc
>

Reply via email to