Sean, Thanks a ton Sean... This is exactly what I was looking for.
As mentioned in the code - // This horrible, separate declaration is necessary to appease the compiler @SuppressWarnings("unchecked") Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; writableDStream.saveAsNewAPIHadoopFiles(dataDirString + "/oryx", "data", keyWritableClass, messageWritableClass, outputFormatClass, streamingContext.sparkContext().hadoopConfiguration()); I was just having a hard time with the OutputFormatClass parameter. The scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala defines saveAsNewAPIHadoopFiles as the following - /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } The problem is that Class[_ <: NewOutputFormat[_, _]] in scala cannot be assigned as the following (say you are using TextOutputFormat & Text as KeyClass and IntWritable as ValueClass) TextOuputFormat<Text, IntWritable>.class in java due to 'type erasure". The parameterized types lose there type arguments when they are translated to byte code during compilation. Looks like adding this works - @SuppressWarnings("unchecked") Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; Thanks again Sean... On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen <so...@cloudera.com> wrote: > Here's an example: > > > https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131 > > On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob <abe.jac...@gmail.com> > wrote: > > Hi All, > > > > Would really appreciate from the community if anyone has implemented the > > saveAsNewAPIHadoopFiles method in "Java" found in the > > org.apache.spark.streaming.api.java.JavaPairDStream<K,V> > > > > Any code snippet or online link would be greatly appreciated. > > > > Regards, > > Jacob > > > > > -- ~