Hello Akhil,

Thank you for taking your time for a detailed answer. I managed to solve it
in a very similar manner.

Kind regards,
Emre Sevinç


On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Hi Emre,
>
> This is how you do that in scala:
>
> val lines = ssc.fileStream[LongWritable, Text,
> TextInputFormat]("/home/akhld/sigmoid", (t: Path) => true, true)
>
> ​In java you can do something like:
>
> jssc.ssc().<LongWritable, Text,
> SequenceFileInputFormat>fileStream("/home/akhld/sigmoid", new
> AbstractFunction1<Path, Object>() {
>             @Override
>             public Boolean apply(Path input) {
>             //file filtering logic here
>
>             return true;
>             }
>     }, true, ClassTag$.MODULE$.apply(LongWritable.class),
> ClassTag$.MODULE$.apply(Text.class),
> ClassTag$.MODULE$.apply(SequenceFileInputFormat.class));
>
>
> ​
>
>
> Thanks
> Best Regards
>
> On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc <emre.sev...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter
>> for file names when creating an InputDStream
>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html>
>> by invoking the fileStream
>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29>
>> method. My code is working perfectly fine when I don't use a file filter,
>> e.g. by invoking the other fileStream
>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29>
>> method (described here
>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29>:
>>
>> https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
>> ).
>>
>> According to the documentation of *fileStream* method, I can pass it
>>
>>   scala.Function1<org.apache.hadoop.fs.Path,Object> filter
>>
>> But so far, I could not create a fileFilter. My initial attempts have
>> been
>>
>> 1- Tried to implement it as:
>>
>> Function1<Path, Object> fileFilter = new Function1<Path, Object>() {
>>     @Override
>>     public Object apply(Path v1) {
>>       return true;
>>     }
>>
>>     @Override
>>     public <A> Function1<A, Object> compose(Function1<A, Path> g) {
>>       return Function1$class.compose(this, g);
>>     }
>>
>>     @Override
>>     public <A> Function1<Path, A> andThen(Function1<Object, A> g) {
>>       return Function1$class.andThen(this, g);
>>     }
>>   };
>>
>> But apparently my implementation of andThen is wrong, and I couldn't 
>> understand how I should implement it. It complains that the anonymous 
>> function:
>>
>>      is not abstract and does not override abstract method 
>> <A>andThen$mcVJ$sp(scala.Function1<scala.runtime.BoxedUnit,A>) in 
>> scala.Function1
>>
>> 2- Tried to implement it as:
>>
>> Function1<Path, Object> fileFilter = new AbstractFunction1<Path, Object>() {
>>     @Override
>>     public Object apply(Path v1) {
>>       return true;
>>     }
>>   };
>>
>> This one compiles, but when I run it I get an exception:
>>
>> 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1
>> java.io.NotSerializableException: myModule$1
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>     at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>     at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at 
>> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>>     at 
>> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169)
>>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
>>     at 
>> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:483)
>>     at 
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>     at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>     at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>     at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>     at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>     at 
>> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184)
>>     at 
>> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:263)
>>     at 
>> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
>>     at 
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>     at 
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:74)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>     at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Any ideas how I can implement a *fileFilter* so that I can pass it 
>> fileStream method, so that I can make Spark Streaming process only the file 
>> name patterns I want?
>>
>>
>> --
>>
>> Emre Sevinç
>>
>>
>>
>


-- 
Emre Sevinc

Reply via email to