Hi,

To keep processing the older file also you can use fileStream instead of
textFileStream. It has a parameter to specify to look for already present
files.

For deleting the processed files one way is to get the list of all files in
the dStream. This can be done by using the foreachRDD api of the dStream
received from the fileStream(or textFileStream).

Suppose the dStream is

JavaDStream<String> jpDstream = ssc.textFileStream("path/to/your/folder/");

jpDstream.print();

 jpDstream.foreachRDD(

 new Function<JavaRDD<String>, Void>(){

  @Override

  public Void call(JavaRDD<String> arg0) throws Exception {

  getContentHigh(arg0,ssc);

  return null;

  }

 }

 );

 public static <U> void getContentHigh(JavaRDD<String> ds,
JavaStreamingContext ssc){

int lenPartition = ds.rdd().partitions().length; // this gives the number
of files the stream picked

for(int i=0;i<lenPartition;i++) {

     UnionPartition upp = (UnionPartition) listPartitions[i];

   NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();

String fPath = npp.serializableHadoopSplit().value().toString();

String[] nT =  tmpName.split(":");

String name = nT[0]; // name is the path of the file picked for processing.
the processing logic can be inside this loop. once //done you can delete
the file using the path in the variable "name"


}

}


Thanks.

On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] <
ml-node+s1001560n21444...@n3.nabble.com> wrote:

> We are running a Spark streaming job that retrieves files from a directory
> (using textFileStream).
> One concern we are having is the case where the job is down but files are
> still being added to the directory.
> Once the job starts up again, those files are not being picked up (since
> they are not new or changed while the job is running) but we would like
> them to be processed.
> Is there a solution for that? Is there a way to keep track what files have
> been processed and can we "force" older files to be picked up? Is there a
> way to delete the processed files?
>
> Thanks!
> Markus
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to