Try: JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/sigmoid/foo");
Thanks Best Regards On Sat, Jan 17, 2015 at 4:24 AM, Leonidas Fegaras <fega...@cse.uta.edu> wrote: > Dear Spark users, > I have a problem using File Streams in Java on Spark 1.2.0. I can process > hadoop files in local mode using: > > spark_context.newAPIHadoopFile(path,F.class,K.class,V.class,conf) > > where F extends > org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>. But when I try > to to do the same thing in Spark Streaming using: > > stream_context.<K,V,F>fileStream(directory) > .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() { > public Void call ( JavaPairRDD<K,V> rdd ) throws > Exception { > ... > } > }); > > and when I drop a new text file in the directory I get the following error: > > 15/01/16 16:29:43 ERROR scheduler.JobScheduler: Error generating jobs for > time 1421447383000 ms > java.lang.ClassCastException: java.lang.Object cannot be cast to > org.apache.hadoop.mapreduce.InputFormat > at > org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) > at > org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236) > at > org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.streaming.dstream.FileInputDStream.org > $apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234) > at > org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288) > at scala.Option.orElse(Option.scala:257) > > Same error when I process hadoop sequence files. I am sure my input format > F extends org.apache.hadoop.mapreduce.InputFormat. Any ideas? > Thank you > Leonidas Fegaras > >