I've managed to solve this, but I still don't know exactly why my solution
works:

In my code I was trying to force the Spark to output via:

  jsonIn.print();

jsonIn being a JavaDStream<String>.

When removed the code above, and added the code below to force the output
operation, hence the execution:

    jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
        stringJavaRDD.collect();
        return null;
      }
    });

It works as I expect, processing all of the 20 files I give to it, instead
of stopping at 16.

--
Emre


On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com> wrote:

> Hello,
>
> I have an application in Java that uses Spark Streaming 1.2.1 in the
> following manner:
>
>  - Listen to the input directory.
>  - If a new file is copied to that input directory process it.
>  - Process: contact a RESTful web service (running also locally and
> responsive), send the contents of the file, receive the response from the
> web service, write the results as a new file into the output directory
>  - batch interval : 30 seconds
>  - checkpoint interval: 150 seconds
>
> When I test the application locally with 1 or 2 files, it works perfectly
> fine as expected. I run it like:
>
>         spark-submit --class myClass --verbose --master local[4]
> --deploy-mode client myApp.jar /in file:///out
>
> But then I've realized something strange when I copied 20 files to the
> INPUT directory: Spark Streaming detects all of the files, but it ends up
> processing *only 16 files*. And the remaining 4 are not processed at all.
>
> I've tried it with 19, 18, and then 17 files. Same result, only 16 files
> end up in the output directory.
>
> Then I've tried it by copying 16 files at once to the input directory, and
> it can process all of the 16 files. That's why I call it magic number 16.
>
> When I mean it detects all of the files, I mean that in the logs I see the
> following lines when I copy 17 files:
>
>
> ===============================================================================================================================
> 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
> 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
> eth0)
> 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
> bind to another address
> 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
> 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
> file:/tmp/receivedBlockMetadata
> 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Reading from the logs:
> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
> -------------------------------------------
> Time: 1424086260000 ms
> -------------------------------------------
>
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
> file:/tmp/receivedBlockMetadata older than 1424085960000:
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Cleared log files in
> file:/tmp/receivedBlockMetadata older than 1424085960000
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
> file:/tmp/receivedBlockMetadata older than 1424085960000:
> 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Cleared log files in
> file:/tmp/receivedBlockMetadata older than 1424085960000
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
> file:/tmp/receivedBlockMetadata older than 1424085990000:
> 2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
> ReceivedBlockHandlerMaster:59 - Cleared log files in
> file:/tmp/receivedBlockMetadata older than 1424085990000
>
> -------------------------------------------
>
> Time: 1424086290000 ms
> -------------------------------------------
>
> ===============================================================================================================================
>
> In other words it says "Total input paths to process :1" for 17 times. And
> when I copy 20 files, it says that 20 times.
>
> But it always ends up processing 16 files at once and the remaining ones
> are not processed at all.
>
> However, if I first copy 16 files to the input directory, wait for Spark
> Streaming application to process them (by checking the output directory and
> seeing that 16 files have been created properly), and then copy the 4 more
> files, those 4 files are also processed!
>
> So now I'm in a weird situation that I have to copy 16 files at maximum at
> once, wait them to be processed, and only after that copy again 16 files at
> max, ... otherwise I lose the extra files, in the sense that they are not
> processed. This is not acceptable in my use-case.
>
> I've also checked the parameter
>
>    spark.streaming.receiver.maxRate
>
>
> and it is INFINITE by default, I've tried setting it to 10 for example,
> and nothing has changed.
>
> Any ideas what might be causing this situation, having a magic number of
> 16 files at once?
>
>
> --
> Emre Sevinç
>



-- 
Emre Sevinc

Reply via email to