How are you deciding whether files are processed or not? It doesn't seem
possible from this code. Maybe it just seems so.
On Feb 16, 2015 12:51 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote:

> I've managed to solve this, but I still don't know exactly why my solution
> works:
>
> In my code I was trying to force the Spark to output via:
>
>   jsonIn.print();
>
> jsonIn being a JavaDStream<String>.
>
> When removed the code above, and added the code below to force the output
> operation, hence the execution:
>
>     jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() {
>       @Override
>       public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
>         stringJavaRDD.collect();
>         return null;
>       }
>     });
>
> It works as I expect, processing all of the 20 files I give to it, instead
> of stopping at 16.
>
> --
> Emre
>
>
> On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have an application in Java that uses Spark Streaming 1.2.1 in the
>> following manner:
>>
>>  - Listen to the input directory.
>>  - If a new file is copied to that input directory process it.
>>  - Process: contact a RESTful web service (running also locally and
>> responsive), send the contents of the file, receive the response from the
>> web service, write the results as a new file into the output directory
>>  - batch interval : 30 seconds
>>  - checkpoint interval: 150 seconds
>>
>> When I test the application locally with 1 or 2 files, it works perfectly
>> fine as expected. I run it like:
>>
>>         spark-submit --class myClass --verbose --master local[4]
>> --deploy-mode client myApp.jar /in file:///out
>>
>> But then I've realized something strange when I copied 20 files to the
>> INPUT directory: Spark Streaming detects all of the files, but it ends up
>> processing *only 16 files*. And the remaining 4 are not processed at all.
>>
>> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
>> end up in the output directory.
>>
>> Then I've tried it by copying 16 files at once to the input directory,
>> and it can process all of the 16 files. That's why I call it magic number
>> 16.
>>
>> When I mean it detects all of the files, I mean that in the logs I see
>> the following lines when I copy 17 files:
>>
>>
>> ===============================================================================================================================
>> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
>> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
>> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
>> eth0)
>> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
>> bind to another address
>> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
>> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
>> file:/tmp/receivedBlockMetadata
>> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Reading from the logs:
>> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
>> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
>> -------------------------------------------
>> Time: 1424086260000 ms
>> -------------------------------------------
>>
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000:
>> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085960000
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
>> file:/tmp/receivedBlockMetadata older than 1424085990000:
>> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
>> ReceivedBlockHandlerMaster:59 - Cleared log files in
>> file:/tmp/receivedBlockMetadata older than 1424085990000
>>
>> -------------------------------------------
>>
>> Time: 1424086290000 ms
>> -------------------------------------------
>>
>> ===============================================================================================================================
>>
>> In other words it says "Total input paths to process :1" for 17 times.
>> And when I copy 20 files, it says that 20 times.
>>
>> But it always ends up processing 16 files at once and the remaining ones
>> are not processed at all.
>>
>> However, if I first copy 16 files to the input directory, wait for Spark
>> Streaming application to process them (by checking the output directory and
>> seeing that 16 files have been created properly), and then copy the 4 more
>> files, those 4 files are also processed!
>>
>> So now I'm in a weird situation that I have to copy 16 files at maximum
>> at once, wait them to be processed, and only after that copy again 16 files
>> at max, ... otherwise I lose the extra files, in the sense that they are
>> not processed. This is not acceptable in my use-case.
>>
>> I've also checked the parameter
>>
>>    spark.streaming.receiver.maxRate
>>
>>
>> and it is INFINITE by default, I've tried setting it to 10 for example,
>> and nothing has changed.
>>
>> Any ideas what might be causing this situation, having a magic number of
>> 16 files at once?
>>
>>
>> --
>> Emre Sevinç
>>
>
>
>
> --
> Emre Sevinc
>

Reply via email to