Thank you very much for the detailed answer. I feel a little dumb asking
but how would that work when using Scala (we use Spark 1.0.2)?
I can not figure it out. E.g. I am having trouble using ​UnionPartition
and NewHadoopPartition or even ds.values() is not an option for me (in the
IDE). Do you have any Scala code that does something similar? Any help is
appreciated.
BTW: I am creating the dStream like this:
val ds = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), f,
true).map(_._2.toString)
Thanks,
Markus
On Tue, Feb 3, 2015 at 4:55 AM, Prannoy [via Apache Spark User List]
ml-node+s1001560n21478...@n3.nabble.com wrote:
Hi,
To keep processing the older file also you can use fileStream instead of
textFileStream. It has a parameter to specify to look for already present
files.
For deleting the processed files one way is to get the list of all files
in the dStream. This can be done by using the foreachRDD api of the dStream
received from the fileStream(or textFileStream).
Suppose the dStream is
JavaDStreamString jpDstream = ssc
.textFileStream(path/to/your/folder/);
jpDstream.print();
jpDstream.foreachRDD(
new FunctionJavaRDDString, Void(){
@Override
public Void call(JavaRDDString arg0) throws Exception {
getContentHigh(arg0,ssc);
return null;
}
}
);
public static U void getContentHigh(JavaRDDString ds,
JavaStreamingContext ssc){
int lenPartition = ds.rdd().partitions().length; // this gives the number
of files the stream picked
for(int i=0;ilenPartition;i++) {
UnionPartition upp = (UnionPartition) listPartitions[i];
NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();
String fPath = npp.serializableHadoopSplit().value().toString();
String[] nT = tmpName.split(:);
String name = nT[0]; // name is the path of the file picked for
processing. the processing logic can be inside this loop. once //done you
can delete the file using the path in the variable name
}
}
Thanks.
On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List]
[hidden
email] http:///user/SendEmail.jtp?type=nodenode=21478i=0 wrote:
We are running a Spark streaming job that retrieves files from a
directory (using textFileStream).
One concern we are having is the case where the job is down but files are
still being added to the directory.
Once the job starts up again, those files are not being picked up (since
they are not new or changed while the job is running) but we would like
them to be processed.
Is there a solution for that? Is there a way to keep track what files
have been processed and can we force older files to be picked up? Is
there a way to delete the processed files?
Thanks!
Markus
--
If you reply to this email, your message will be added to the
discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
To start a new topic under Apache Spark User List, email [hidden email]
http:///user/SendEmail.jtp?type=nodenode=21478i=1
To unsubscribe from Apache Spark User List, click here.
NAML
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
--
If you reply to this email, your message will be added to the discussion
below:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html
To unsubscribe from Spark streaming - tracking/deleting processed files,
click
here
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21444code=Z2FudGVybUBnbWFpbC5jb218MjE0NDR8LTE4MTQ3NTI4NTM=
.
NAML
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21504.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.