Sorry for latter reply. Yep, RDRawDataRecord is my object, It defined in other java project(jar.), I get it with maven. My MapReduce program also use it and works.
On Fri, Mar 18, 2016 at 12:48 AM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > Hi Tony, > > Is > > com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord > > One of your own packages? > > Sounds like it is one throwing the error > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 17 March 2016 at 15:21, Tony Liu <tony.liu0...@gmail.com> wrote: > >> Hi, >> My HDFS file is store with custom data structures. I want to read it >> with SparkContext object.So I define a formatting object: >> >> *1. code of RawDataInputFormat.scala* >> >> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >> import org.apache.hadoop.io.LongWritable >> import org.apache.hadoop.mapred._ >> >> /** >> * Created by Tony on 3/16/16. >> */ >> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends >> FileInputFormat { >> >> override def getRecordReader(split: InputSplit, job: JobConf, reporter: >> Reporter): RecordReader[LW, RD] = { >> new RawReader(split, job, reporter) >> } >> >> } >> >> *2. code of RawReader.scala* >> >> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >> import org.apache.hadoop.io.{LongWritable, SequenceFile} >> import org.apache.hadoop.mapred._ >> >> /** >> * Created by Tony on 3/17/16. >> */ >> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends >> RecordReader[LW, RD] { >> >> var reader: SequenceFile.Reader = null >> var currentPos: Long = 0L >> var length: Long = 0L >> >> def this(split: InputSplit, job: JobConf, reporter: Reporter) { >> this() >> val p = (split.asInstanceOf[FileSplit]).getPath >> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p)) >> } >> >> override def next(key: LW, value: RD): Boolean = { >> val flag = reader.next(key, value) >> currentPos = reader.getPosition() >> flag >> } >> >> override def getProgress: Float = Math.min(1.0f, currentPos / >> length.toFloat) >> >> override def getPos: Long = currentPos >> >> override def createKey(): LongWritable = { >> new LongWritable() >> } >> >> override def close(): Unit = { >> reader.close() >> } >> >> override def createValue(): RDRawDataRecord = { >> new RDRawDataRecord() >> } >> } >> >> *3. code of RDRawDataRecord.scala* >> >> import com.kiisoo.aegis.common.rawdata.RawDataRecord; >> import java.io.DataInput; >> import java.io.DataOutput; >> import java.io.IOException; >> import org.apache.commons.lang.StringUtils; >> import org.apache.hadoop.io.Writable; >> >> public class RDRawDataRecord implements Writable { >> private String smac; >> private String dmac; >> private int hrssi; >> private int lrssi; >> private long fstamp; >> private long lstamp; >> private long maxstamp; >> private long minstamp; >> private long stamp; >> >> public void readFields(DataInput in) throws IOException { >> this.smac = in.readUTF(); >> this.dmac = in.readUTF(); >> this.hrssi = in.readInt(); >> this.lrssi = in.readInt(); >> this.fstamp = in.readLong(); >> this.lstamp = in.readLong(); >> this.maxstamp = in.readLong(); >> this.minstamp = in.readLong(); >> this.stamp = in.readLong(); >> } >> >> public void write(DataOutput out) throws IOException { >> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:""); >> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:""); >> out.writeInt(this.hrssi); >> out.writeInt(this.lrssi); >> out.writeLong(this.fstamp); >> out.writeLong(this.lstamp); >> out.writeLong(this.maxstamp); >> out.writeLong(this.minstamp); >> out.writeLong(this.stamp); >> } >> >> */** * >> >> *ignore getter setter* >> >> * **/* >> >> } >> >> *At last, I use this code to run*: >> >> val filePath = >> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044" >> val conf = new SparkConf() >> conf.setMaster("local") >> conf.setAppName("demo") >> val sc = new SparkContext(conf) >> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >> file.foreach(v => { >> println(v._2.getDmac) // Attribute of custom objects >> }) >> >> *I get an error, it says:* >> >> Error:(41, 19) type arguments >> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]] >> conform to the bounds of none of the overloaded alternatives of >> value hadoopFile: [K, V, F <: >> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: >> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit >> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, >> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: >> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: >> scala.reflect.ClassTag[V], implicit fm: >> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] >> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >> ^ >> >> >> >> *I also try read the text file with SparkContext AIP >> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', >> It works.* >> *This error is what does this mean? How to fix this error?* >> >> Thank you for help me. >> >> -- >> Tony >> :) >> > > -- :)