Hi, To keep processing the older file also you can use fileStream instead of textFileStream. It has a parameter to specify to look for already present files.
For deleting the processed files one way is to get the list of all files in the dStream. This can be done by using the foreachRDD api of the dStream received from the fileStream(or textFileStream). Suppose the dStream is JavaDStream<String> jpDstream = ssc.textFileStream("path/to/your/folder/"); jpDstream.print(); jpDstream.foreachRDD( new Function<JavaRDD<String>, Void>(){ @Override public Void call(JavaRDD<String> arg0) throws Exception { getContentHigh(arg0,ssc); return null; } } ); public static <U> void getContentHigh(JavaRDD<String> ds, JavaStreamingContext ssc){ int lenPartition = ds.rdd().partitions().length; // this gives the number of files the stream picked for(int i=0;i<lenPartition;i++) { UnionPartition upp = (UnionPartition) listPartitions[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition(); String fPath = npp.serializableHadoopSplit().value().toString(); String[] nT = tmpName.split(":"); String name = nT[0]; // name is the path of the file picked for processing. the processing logic can be inside this loop. once //done you can delete the file using the path in the variable "name" } } Thanks. On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] < ml-node+s1001560n21444...@n3.nabble.com> wrote: > We are running a Spark streaming job that retrieves files from a directory > (using textFileStream). > One concern we are having is the case where the job is down but files are > still being added to the directory. > Once the job starts up again, those files are not being picked up (since > they are not new or changed while the job is running) but we would like > them to be processed. > Is there a solution for that? Is there a way to keep track what files have > been processed and can we "force" older files to be picked up? Is there a > way to delete the processed files? > > Thanks! > Markus > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html Sent from the Apache Spark User List mailing list archive at Nabble.com.