[ https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14340271#comment-14340271 ]
Sean Owen commented on SPARK-5297: ---------------------------------- Decided not to back port to 1.2 per tdas > JavaStreamingContext.fileStream won't work because type info isn't propagated > ----------------------------------------------------------------------------- > > Key: SPARK-5297 > URL: https://issues.apache.org/jira/browse/SPARK-5297 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.2.0 > Reporter: Leonidas Fegaras > Assignee: Saisai Shao > Fix For: 1.3.0 > > > The following code: > {code} > stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory) > .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() { > public Void call ( JavaPairRDD<K,V> rdd ) throws Exception { > for ( Tuple2<K,V> x: rdd.collect() ) > System.out.println("# "+x._1+" "+x._2); > return null; > } > }); > stream_context.start(); > stream_context.awaitTermination(); > {code} > for custom (serializable) classes K and V compiles fine but gives an error > when I drop a new hadoop sequence file in the directory: > {quote} > 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for > time 1421507639000 ms > java.lang.ClassCastException: java.lang.Object cannot be cast to > org.apache.hadoop.mapreduce.InputFormat > at > org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) > at > org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236) > at > org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234) > at > org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288) > at scala.Option.orElse(Option.scala:257) > {quote} > The same classes K and V work fine for non-streaming Spark: > {code} > spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf) > {code} > also streaming works fine for TextFileInputFormat. > The issue is that class manifests are erased to object in the Java file > stream constructor, but those are relied on downstream when creating the > Hadoop RDD that backs each batch of the file stream. > https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263 > https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org