Re: Problem with File Streams

2015-01-17 Thread Akhil Das
Try:

JavaPairDStreamString, String foo = ssc.String, String,
SequenceFileInputFormatfileStream(/sigmoid/foo);


Thanks
Best Regards

On Sat, Jan 17, 2015 at 4:24 AM, Leonidas Fegaras fega...@cse.uta.edu
wrote:

  Dear Spark users,
 I have a problem using File Streams in Java on Spark 1.2.0. I can process
 hadoop files in local mode using:

 spark_context.newAPIHadoopFile(path,F.class,K.class,V.class,conf)

 where F extends
 org.apache.hadoop.mapreduce.lib.input.FileInputFormatK,V. But when I try
 to to do the same thing in Spark Streaming using:

 stream_context.K,V,FfileStream(directory)
 .foreachRDD(new FunctionJavaPairRDDK,V,Void() {
 public Void call ( JavaPairRDDK,V rdd ) throws
 Exception {
...
 }
 });

 and when I drop a new text file in the directory I get the following error:

 15/01/16 16:29:43 ERROR scheduler.JobScheduler: Error generating jobs for
 time 1421447383000 ms
 java.lang.ClassCastException: java.lang.Object cannot be cast to
 org.apache.hadoop.mapreduce.InputFormat
 at
 org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
 at
 org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
 at
 org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.streaming.dstream.FileInputDStream.org
 $apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
 at
 org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
 at scala.Option.orElse(Option.scala:257)

 Same error when I process hadoop sequence files. I am sure my input format
 F extends org.apache.hadoop.mapreduce.InputFormat. Any ideas?
 Thank you
 Leonidas Fegaras




Re: Problem with File Streams

2015-01-17 Thread Leonidas Fegaras
My key/value classes are custom serializable classes. It looks like a 
bug. So I filed it on JIRA as SPARK-5297

Thanks
Leonidas

On 01/17/2015 03:07 AM, Akhil Das wrote:

Try:

JavaPairDStreamString, String foo = ssc.String, String,
SequenceFileInputFormatfileStream(/sigmoid/foo);


Thanks
Best Regards

On Sat, Jan 17, 2015 at 4:24 AM, Leonidas Fegaras fega...@cse.uta.edu 
mailto:fega...@cse.uta.edu wrote:


Dear Spark users,
I have a problem using File Streams in Java on Spark 1.2.0. I can
process hadoop files in local mode using:

spark_context.newAPIHadoopFile(path,F.class,K.class,V.class,conf)

where F extends
org.apache.hadoop.mapreduce.lib.input.FileInputFormatK,V. But
when I try to to do the same thing in Spark Streaming using:

stream_context.K,V,FfileStream(directory)
.foreachRDD(new FunctionJavaPairRDDK,V,Void() {
public Void call ( JavaPairRDDK,V rdd )
throws Exception {
   ...
}
});

and when I drop a new text file in the directory I get the
following error:

15/01/16 16:29:43 ERROR scheduler.JobScheduler: Error generating
jobs for time 1421447383000 ms
java.lang.ClassCastException: java.lang.Object cannot be cast to
org.apache.hadoop.mapreduce.InputFormat
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at

org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
at

org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at

scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org

http://org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
at

org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
at

org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
at

org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)

Same error when I process hadoop sequence files. I am sure my
input format F extends org.apache.hadoop.mapreduce.InputFormat.
Any ideas?
Thank you
Leonidas Fegaras