[ 
https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14281899#comment-14281899
 ] 

Sean Owen commented on SPARK-5297:
----------------------------------

I don't see that this has anything to do with custom key / values? 

What it suggests is that type information got lost along the way from Java to 
Scala. This fails: {{val inputFormat = inputFormatClass.newInstance}}.

I won't claim to understand this enough to be sure, but I don't see how the 
type info gets into the underlying call to {{StreamingContext.fileStream}}. 
{{JavaStreamingContext}} says:

{code}
  def fileStream[K, V, F <: NewInputFormat[K, V]](
      directory: String): JavaPairInputDStream[K, V] = {
    implicit val cmk: ClassTag[K] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
    implicit val cmv: ClassTag[V] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
    implicit val cmf: ClassTag[F] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
    ssc.fileStream[K, V, F](directory)
  }
{code}

Obviously in {{newAPIHadoopFile}} the type information is carried explicitly 
there. 

Is this something you can weigh in on [~tdas]?

> File Streams do not work with custom key/values
> -----------------------------------------------
>
>                 Key: SPARK-5297
>                 URL: https://issues.apache.org/jira/browse/SPARK-5297
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Leonidas Fegaras
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The following code:
> {code}
> stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
> .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
>      public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
>          for ( Tuple2<K,V> x: rdd.collect() )
>              System.out.println("# "+x._1+" "+x._2);
>          return null;
>      }
>   });
> stream_context.start();
> stream_context.awaitTermination();
> {code}
> for custom (serializable) classes K and V compiles fine but gives an error
> when I drop a new hadoop sequence file in the directory:
> {quote}
> 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for 
> time 1421507639000 ms
> java.lang.ClassCastException: java.lang.Object cannot be cast to 
> org.apache.hadoop.mapreduce.InputFormat
>       at 
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
>       at 
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
>       at 
> org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at 
> org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
>       at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
>       at scala.Option.orElse(Option.scala:257)
> {quote}
> The same classes K and V work fine for non-streaming Spark:
> {code}
> spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
> {code}
> also streaming works fine for TextFileInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to