Leonidas Fegaras created SPARK-5297:
---------------------------------------

             Summary: File Streams do not work with custom key/values
                 Key: SPARK-5297
                 URL: https://issues.apache.org/jira/browse/SPARK-5297
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.2.0
            Reporter: Leonidas Fegaras
            Priority: Minor
             Fix For: 1.2.0


The following code:
{code}
stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
.foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
     public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
         for ( Tuple2<K,V> x: rdd.collect() )
             System.out.println("# "+x._1+" "+x._2);
         return null;
     }
  });
stream_context.start();
stream_context.awaitTermination();
{code}
for custom (serializable) classes K and V compiles fine but gives an error
when I drop a new hadoop sequence file in the directory:
{quote}
15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 
1421507639000 ms
java.lang.ClassCastException: java.lang.Object cannot be cast to 
org.apache.hadoop.mapreduce.InputFormat
        at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at 
org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
        at 
org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
        at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
        at scala.Option.orElse(Option.scala:257)
{quote}
The same classes K and V work fine for non-streaming Spark:
{code}
spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
{code}
also streaming works fine for TextFileInputFormat.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to