How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, "Emre Sevinc" <emre.sev...@gmail.com> wrote:
> I've managed to solve this, but I still don't know exactly why my solution > works: > > In my code I was trying to force the Spark to output via: > > jsonIn.print(); > > jsonIn being a JavaDStream<String>. > > When removed the code above, and added the code below to force the output > operation, hence the execution: > > jsonIn.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> stringJavaRDD) throws Exception { > stringJavaRDD.collect(); > return null; > } > }); > > It works as I expect, processing all of the 20 files I give to it, instead > of stopping at 16. > > -- > Emre > > > On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc <emre.sev...@gmail.com> > wrote: > >> Hello, >> >> I have an application in Java that uses Spark Streaming 1.2.1 in the >> following manner: >> >> - Listen to the input directory. >> - If a new file is copied to that input directory process it. >> - Process: contact a RESTful web service (running also locally and >> responsive), send the contents of the file, receive the response from the >> web service, write the results as a new file into the output directory >> - batch interval : 30 seconds >> - checkpoint interval: 150 seconds >> >> When I test the application locally with 1 or 2 files, it works perfectly >> fine as expected. I run it like: >> >> spark-submit --class myClass --verbose --master local[4] >> --deploy-mode client myApp.jar /in file:///out >> >> But then I've realized something strange when I copied 20 files to the >> INPUT directory: Spark Streaming detects all of the files, but it ends up >> processing *only 16 files*. And the remaining 4 are not processed at all. >> >> I've tried it with 19, 18, and then 17 files. Same result, only 16 files >> end up in the output directory. >> >> Then I've tried it by copying 16 files at once to the input directory, >> and it can process all of the 16 files. That's why I call it magic number >> 16. >> >> When I mean it detects all of the files, I mean that in the logs I see >> the following lines when I copy 17 files: >> >> >> =============================================================================================================================== >> 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: "1G" >> 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves >> to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface >> eth0) >> 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to >> bind to another address >> 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started >> 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load >> native-hadoop library for your platform... using builtin-java classes where >> applicable >> 2015-02-16 12:30:53 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from >> file:/tmp/receivedBlockMetadata >> 2015-02-16 12:30:53 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Reading from the logs: >> file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 >> file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 >> ------------------------------------------- >> Time: 1424086260000 ms >> ------------------------------------------- >> >> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >> file:/tmp/receivedBlockMetadata older than 1424085960000: >> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Cleared log files in >> file:/tmp/receivedBlockMetadata older than 1424085960000 >> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >> file:/tmp/receivedBlockMetadata older than 1424085960000: >> 2015-02-16 12:31:00 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Cleared log files in >> file:/tmp/receivedBlockMetadata older than 1424085960000 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-02-16 12:31:31 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in >> file:/tmp/receivedBlockMetadata older than 1424085990000: >> 2015-02-16 12:31:31 INFO WriteAheadLogManager for >> ReceivedBlockHandlerMaster:59 - Cleared log files in >> file:/tmp/receivedBlockMetadata older than 1424085990000 >> >> ------------------------------------------- >> >> Time: 1424086290000 ms >> ------------------------------------------- >> >> =============================================================================================================================== >> >> In other words it says "Total input paths to process :1" for 17 times. >> And when I copy 20 files, it says that 20 times. >> >> But it always ends up processing 16 files at once and the remaining ones >> are not processed at all. >> >> However, if I first copy 16 files to the input directory, wait for Spark >> Streaming application to process them (by checking the output directory and >> seeing that 16 files have been created properly), and then copy the 4 more >> files, those 4 files are also processed! >> >> So now I'm in a weird situation that I have to copy 16 files at maximum >> at once, wait them to be processed, and only after that copy again 16 files >> at max, ... otherwise I lose the extra files, in the sense that they are >> not processed. This is not acceptable in my use-case. >> >> I've also checked the parameter >> >> spark.streaming.receiver.maxRate >> >> >> and it is INFINITE by default, I've tried setting it to 10 for example, >> and nothing has changed. >> >> Any ideas what might be causing this situation, having a magic number of >> 16 files at once? >> >> >> -- >> Emre Sevinç >> > > > > -- > Emre Sevinc >