Sean,

Thanks a ton Sean... This is exactly what I was looking for.

As mentioned in the code -

    // This horrible, separate declaration is necessary to appease the
compiler
    @SuppressWarnings("unchecked")
    Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends
OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
    writableDStream.saveAsNewAPIHadoopFiles(dataDirString + "/oryx",
"data", keyWritableClass, messageWritableClass, outputFormatClass,
streamingContext.sparkContext().hadoopConfiguration());

I was just having a hard time with the OutputFormatClass parameter.  The
scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala
defines saveAsNewAPIHadoopFiles as the following -

  /**
   * Save each RDD in `this` DStream as a Hadoop file. The file name at
each batch interval is
   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsNewAPIHadoopFiles(
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
outputFormatClass, conf)
  }

The problem is that Class[_ <: NewOutputFormat[_, _]] in scala cannot be
assigned as the following (say you are using TextOutputFormat & Text as
KeyClass and IntWritable as ValueClass)  TextOuputFormat<Text,
IntWritable>.class in java due to 'type erasure". The parameterized types
lose there type arguments when they are translated to byte code during
compilation.


Looks like adding this works -

@SuppressWarnings("unchecked")
 Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends
OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;


Thanks again Sean...


On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen <so...@cloudera.com> wrote:

> Here's an example:
>
>
> https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131
>
> On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob <abe.jac...@gmail.com>
> wrote:
> > Hi All,
> >
> > Would really appreciate from the community if anyone has implemented the
> > saveAsNewAPIHadoopFiles method in "Java" found in the
> > org.apache.spark.streaming.api.java.JavaPairDStream<K,V>
> >
> > Any code snippet or online link would be greatly appreciated.
> >
> > Regards,
> > Jacob
> >
> >
>



-- 
~

Reply via email to