Hello,

I have an application in Java that uses Spark Streaming 1.2.1 in the
following manner:

 - Listen to the input directory.
 - If a new file is copied to that input directory process it.
 - Process: contact a RESTful web service (running also locally and
responsive), send the contents of the file, receive the response from the
web service, write the results as a new file into the output directory
 - batch interval : 30 seconds
 - checkpoint interval: 150 seconds

When I test the application locally with 1 or 2 files, it works perfectly
fine as expected. I run it like:

        spark-submit --class myClass --verbose --master local[4]
--deploy-mode client myApp.jar /in file:///out

But then I've realized something strange when I copied 20 files to the
INPUT directory: Spark Streaming detects all of the files, but it ends up
processing *only 16 files*. And the remaining 4 are not processed at all.

I've tried it with 19, 18, and then 17 files. Same result, only 16 files
end up in the output directory.

Then I've tried it by copying 16 files at once to the input directory, and
it can process all of the 16 files. That's why I call it magic number 16.

When I mean it detects all of the files, I mean that in the logs I see the
following lines when I copy 17 files:

===============================================================================================================================
2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: "1G"
2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves to
a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to bind
to another address
2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
file:/tmp/receivedBlockMetadata
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Reading from the logs:
file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
-------------------------------------------
Time: 1424086260000 ms
-------------------------------------------

2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 1424085960000:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 1424085960000
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 1424085960000:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 1424085960000
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 1424085990000:
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 1424085990000

-------------------------------------------

Time: 1424086290000 ms
-------------------------------------------
===============================================================================================================================

In other words it says "Total input paths to process :1" for 17 times. And
when I copy 20 files, it says that 20 times.

But it always ends up processing 16 files at once and the remaining ones
are not processed at all.

However, if I first copy 16 files to the input directory, wait for Spark
Streaming application to process them (by checking the output directory and
seeing that 16 files have been created properly), and then copy the 4 more
files, those 4 files are also processed!

So now I'm in a weird situation that I have to copy 16 files at maximum at
once, wait them to be processed, and only after that copy again 16 files at
max, ... otherwise I lose the extra files, in the sense that they are not
processed. This is not acceptable in my use-case.

I've also checked the parameter

   spark.streaming.receiver.maxRate


and it is INFINITE by default, I've tried setting it to 10 for example, and
nothing has changed.

Any ideas what might be causing this situation, having a magic number of 16
files at once?


-- 
Emre Sevinç

Reply via email to