Sorry for latter reply. Yep, RDRawDataRecord is my object, It defined in
other java project(jar.), I get it with maven. My MapReduce program also
use it and works.

On Fri, Mar 18, 2016 at 12:48 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi Tony,
>
> Is
>
> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>
> One of your own packages?
>
> Sounds like it is one throwing the error
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 March 2016 at 15:21, Tony Liu <tony.liu0...@gmail.com> wrote:
>
>> Hi,
>>    My HDFS file is store with custom data structures. I want to read it
>> with SparkContext object.So I define a formatting object:
>>
>> *1. code of RawDataInputFormat.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/16/16.
>>   */
>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> FileInputFormat {
>>
>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>> Reporter): RecordReader[LW, RD] = {
>>     new RawReader(split, job, reporter)
>>   }
>>
>> }
>>
>> *2. code of RawReader.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/17/16.
>>   */
>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> RecordReader[LW, RD] {
>>
>>   var reader: SequenceFile.Reader = null
>>   var currentPos: Long = 0L
>>   var length: Long = 0L
>>
>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>>     this()
>>     val p = (split.asInstanceOf[FileSplit]).getPath
>>     reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>   }
>>
>>   override def next(key: LW, value: RD): Boolean = {
>>     val flag = reader.next(key, value)
>>     currentPos = reader.getPosition()
>>     flag
>>   }
>>
>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>> length.toFloat)
>>
>>   override def getPos: Long = currentPos
>>
>>   override def createKey(): LongWritable = {
>>     new LongWritable()
>>   }
>>
>>   override def close(): Unit = {
>>     reader.close()
>>   }
>>
>>   override def createValue(): RDRawDataRecord = {
>>     new RDRawDataRecord()
>>   }
>> }
>>
>> *3. code of RDRawDataRecord.scala*
>>
>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import org.apache.commons.lang.StringUtils;
>> import org.apache.hadoop.io.Writable;
>>
>> public class RDRawDataRecord implements Writable {
>>     private String smac;
>>     private String dmac;
>>     private int hrssi;
>>     private int lrssi;
>>     private long fstamp;
>>     private long lstamp;
>>     private long maxstamp;
>>     private long minstamp;
>>     private long stamp;
>>
>>     public void readFields(DataInput in) throws IOException {
>>         this.smac = in.readUTF();
>>         this.dmac = in.readUTF();
>>         this.hrssi = in.readInt();
>>         this.lrssi = in.readInt();
>>         this.fstamp = in.readLong();
>>         this.lstamp = in.readLong();
>>         this.maxstamp = in.readLong();
>>         this.minstamp = in.readLong();
>>         this.stamp = in.readLong();
>>     }
>>
>>     public void write(DataOutput out) throws IOException {
>>         out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>>         out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>>         out.writeInt(this.hrssi);
>>         out.writeInt(this.lrssi);
>>         out.writeLong(this.fstamp);
>>         out.writeLong(this.lstamp);
>>         out.writeLong(this.maxstamp);
>>         out.writeLong(this.minstamp);
>>         out.writeLong(this.stamp);
>>     }
>>
>>     */** *
>>
>>         *ignore getter setter*
>>
>> *    **/*
>>
>> }
>>
>> *At last, I use this code to run*:
>>
>> val filePath = 
>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>> val conf = new SparkConf()
>> conf.setMaster("local")
>> conf.setAppName("demo")
>> val sc = new SparkContext(conf)
>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>> file.foreach(v => {
>>   println(v._2.getDmac) // Attribute of custom objects
>> })
>>
>> *I get an error, it says:*
>>
>> Error:(41, 19) type arguments 
>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>  conform to the bounds of none of the overloaded alternatives of
>>  value hadoopFile: [K, V, F <: 
>> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: 
>> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit 
>> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, 
>> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: 
>> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: 
>> scala.reflect.ClassTag[V], implicit fm: 
>> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
>>     val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>                   ^
>>
>>
>>
>> *I also try read the text file with SparkContext AIP
>> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")',
>> It works.*
>> *This error is what does this mean? How to fix this error?*
>>
>> Thank you for help me.
>>
>> --
>> Tony
>> :)
>>
>
>


-- 
:)

Reply via email to