Hi Emre,

there shouldn't be any difference in which files get processed w/ print()
vs. foreachRDD().  In fact, if you look at the definition of print(), it is
just calling foreachRDD() underneath.  So there is something else going on
here.

We need a little more information to figure out exactly what is going on.
 (I think Sean was getting at the same thing ...)

(a) how do you know that when you use foreachRDD, all 20 files get
processed?

(b) How do you know that only 16 files get processed when you print()? Do
you know the other files are being skipped, or maybe they are just "stuck"
somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
processed ... what happens after you add a few more files to the
directory?  Are they processed immediately, or are they never processed
either?

(c) Can you share any more code of what you are doing to the dstreams
*before* the print() / foreachRDD()?  That might give us more details about
what the difference is.

I can't see how .count.println() would be different than just println(),
but maybe I am missing something also.

Imran

On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc <emre.sev...@gmail.com> wrote:

> Sean,
>
> In this case, I've been testing the code on my local machine and using
> Spark locally, so I all the log output was available on my terminal. And
> I've used the .print() method to have an output operation, just to force
> Spark execute.
>
> And I was not using foreachRDD, I was only using print() method on a
> JavaDStream object, and it was working fine for a few files, up to 16 (and
> without print() it did not do anything because there were no output
> operations).
>
> To sum it up, in my case:
>
>  - Initially, use .print() and no foreachRDD: processes up to 16 files and
> does not do anything for the remaining 4.
>  - Remove .print() and use foreachRDD: processes all of the 20 files.
>
> Maybe, as in Akhil Das's suggestion, using .count.print() might also have
> fixed my problem, but I'm satisfied with foreachRDD approach for now.
> (Though it is still a mystery to me why using .print() had a difference,
> maybe my mental model of Spark is wrong, I thought no matter what output
> operation I used, the number of files processed by Spark would be
> independent of that because the processing is done in a different method,
> .print() is only used to force Spark execute that processing, am I wrong?).
>
> --
> Emre
>
>
> On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> Materialization shouldn't be relevant. The collect by itself doesn't let
>> you detect whether it happened. Print should print some results to the
>> console but on different machines, so may not be a reliable way to see what
>> happened.
>>
>> Yes I understand your real process uses foreachRDD and that's what you
>> should use. It sounds like that works. But you must always have been using
>> that right? What do you mean that you changed to use it?
>>
>> Basically I'm not clear on what the real code does and what about the
>> output of that code tells you only 16 files were processed.
>> On Feb 16, 2015 1:18 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote:
>>
>>> Hello Sean,
>>>
>>> I did not understand your question very well, but what I do is checking
>>> the output directory (and I have various logger outputs at various stages
>>> showing the contents of an input file being processed, the response from
>>> the web service, etc.).
>>>
>>> By the way, I've already solved my problem by using foreachRDD instead
>>> of print (see my second message in this thread). Apparently forcing Spark
>>> to materialize DAG via print() is not the way to go. (My interpretation
>>> might be wrong, but this is what I've just seen in my case).
>>>
>>> --
>>> Emre
>>>
>>>
>>>
>>>
>>> On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> How are you deciding whether files are processed or not? It doesn't
>>>> seem possible from this code. Maybe it just seems so.
>>>> On Feb 16, 2015 12:51 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote:
>>>>
>>>>> I've managed to solve this, but I still don't know exactly why my
>>>>> solution works:
>>>>>
>>>>> In my code I was trying to force the Spark to output via:
>>>>>
>>>>>   jsonIn.print();
>>>>>
>>>>> jsonIn being a JavaDStream<String>.
>>>>>
>>>>> When removed the code above, and added the code below to force the
>>>>> output operation, hence the execution:
>>>>>
>>>>>     jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() {
>>>>>       @Override
>>>>>       public Void call(JavaRDD<String> stringJavaRDD) throws Exception
>>>>> {
>>>>>         stringJavaRDD.collect();
>>>>>         return null;
>>>>>       }
>>>>>     });
>>>>>
>>>>> It works as I expect, processing all of the 20 files I give to it,
>>>>> instead of stopping at 16.
>>>>>
>>>>> --
>>>>> Emre
>>>>>
>>>>>
>>>>> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>>>>>> following manner:
>>>>>>
>>>>>>  - Listen to the input directory.
>>>>>>  - If a new file is copied to that input directory process it.
>>>>>>  - Process: contact a RESTful web service (running also locally and
>>>>>> responsive), send the contents of the file, receive the response from the
>>>>>> web service, write the results as a new file into the output directory
>>>>>>  - batch interval : 30 seconds
>>>>>>  - checkpoint interval: 150 seconds
>>>>>>
>>>>>> When I test the application locally with 1 or 2 files, it works
>>>>>> perfectly fine as expected. I run it like:
>>>>>>
>>>>>>         spark-submit --class myClass --verbose --master local[4]
>>>>>> --deploy-mode client myApp.jar /in file:///out
>>>>>>
>>>>>> But then I've realized something strange when I copied 20 files to
>>>>>> the INPUT directory: Spark Streaming detects all of the files, but it 
>>>>>> ends
>>>>>> up processing *only 16 files*. And the remaining 4 are not processed at 
>>>>>> all.
>>>>>>
>>>>>> I've tried it with 19, 18, and then 17 files. Same result, only 16
>>>>>> files end up in the output directory.
>>>>>>
>>>>>> Then I've tried it by copying 16 files at once to the input
>>>>>> directory, and it can process all of the 16 files. That's why I call it
>>>>>> magic number 16.
>>>>>>
>>>>>> When I mean it detects all of the files, I mean that in the logs I
>>>>>> see the following lines when I copy 17 files:
>>>>>>
>>>>>>
>>>>>> ===============================================================================================================================
>>>>>> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
>>>>>> "1G"
>>>>>> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
>>>>>> resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
>>>>>> interface eth0)
>>>>>> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need
>>>>>> to bind to another address
>>>>>> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
>>>>>> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
>>>>>> native-hadoop library for your platform... using builtin-java classes 
>>>>>> where
>>>>>> applicable
>>>>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>>>>>> file:/tmp/receivedBlockMetadata
>>>>>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>>>>>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>>>>>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>>>>>> -------------------------------------------
>>>>>> Time: 1424086260000 ms
>>>>>> -------------------------------------------
>>>>>>
>>>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>>>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085960000
>>>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>>>>>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085960000
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>>>>>> process : 1
>>>>>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085990000:
>>>>>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>>>>>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>>>>>> file:/tmp/receivedBlockMetadata older than 1424085990000
>>>>>>
>>>>>> -------------------------------------------
>>>>>>
>>>>>> Time: 1424086290000 ms
>>>>>> -------------------------------------------
>>>>>>
>>>>>> ===============================================================================================================================
>>>>>>
>>>>>> In other words it says "Total input paths to process :1" for 17
>>>>>> times. And when I copy 20 files, it says that 20 times.
>>>>>>
>>>>>> But it always ends up processing 16 files at once and the remaining
>>>>>> ones are not processed at all.
>>>>>>
>>>>>> However, if I first copy 16 files to the input directory, wait for
>>>>>> Spark Streaming application to process them (by checking the output
>>>>>> directory and seeing that 16 files have been created properly), and then
>>>>>> copy the 4 more files, those 4 files are also processed!
>>>>>>
>>>>>> So now I'm in a weird situation that I have to copy 16 files at
>>>>>> maximum at once, wait them to be processed, and only after that copy 
>>>>>> again
>>>>>> 16 files at max, ... otherwise I lose the extra files, in the sense that
>>>>>> they are not processed. This is not acceptable in my use-case.
>>>>>>
>>>>>> I've also checked the parameter
>>>>>>
>>>>>>    spark.streaming.receiver.maxRate
>>>>>>
>>>>>>
>>>>>> and it is INFINITE by default, I've tried setting it to 10 for
>>>>>> example, and nothing has changed.
>>>>>>
>>>>>> Any ideas what might be causing this situation, having a magic number
>>>>>> of 16 files at once?
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Emre Sevinç
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Emre Sevinc
>>>>>
>>>>
>>>
>>>
>>> --
>>> Emre Sevinc
>>>
>>
>
>
> --
> Emre Sevinc
>

Reply via email to