Doesn't FileInputFormat require type parameters? Like so: class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends FileInputFormat[LW, RD]
I haven't verified this but it could be related to the compile error you're getting. On Thu, Mar 17, 2016 at 9:53 AM, Benyi Wang <bewang.t...@gmail.com> wrote: > I would say change > > class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends > FileInputFormat > > to > > class RawDataInputFormat[LongWritable, RDRawDataRecord] extends > FileInputFormat > > > On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: >> >> Hi Tony, >> >> Is >> >> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >> >> One of your own packages? >> >> Sounds like it is one throwing the error >> >> HTH >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> >> On 17 March 2016 at 15:21, Tony Liu <tony.liu0...@gmail.com> wrote: >>> >>> Hi, >>> My HDFS file is store with custom data structures. I want to read it >>> with SparkContext object.So I define a formatting object: >>> >>> 1. code of RawDataInputFormat.scala >>> >>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >>> import org.apache.hadoop.io.LongWritable >>> import org.apache.hadoop.mapred._ >>> >>> /** >>> * Created by Tony on 3/16/16. >>> */ >>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] >>> extends FileInputFormat { >>> >>> override def getRecordReader(split: InputSplit, job: JobConf, reporter: >>> Reporter): RecordReader[LW, RD] = { >>> new RawReader(split, job, reporter) >>> } >>> >>> } >>> >>> 2. code of RawReader.scala >>> >>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >>> import org.apache.hadoop.io.{LongWritable, SequenceFile} >>> import org.apache.hadoop.mapred._ >>> >>> /** >>> * Created by Tony on 3/17/16. >>> */ >>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends >>> RecordReader[LW, RD] { >>> >>> var reader: SequenceFile.Reader = null >>> var currentPos: Long = 0L >>> var length: Long = 0L >>> >>> def this(split: InputSplit, job: JobConf, reporter: Reporter) { >>> this() >>> val p = (split.asInstanceOf[FileSplit]).getPath >>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p)) >>> } >>> >>> override def next(key: LW, value: RD): Boolean = { >>> val flag = reader.next(key, value) >>> currentPos = reader.getPosition() >>> flag >>> } >>> >>> override def getProgress: Float = Math.min(1.0f, currentPos / >>> length.toFloat) >>> >>> override def getPos: Long = currentPos >>> >>> override def createKey(): LongWritable = { >>> new LongWritable() >>> } >>> >>> override def close(): Unit = { >>> reader.close() >>> } >>> >>> override def createValue(): RDRawDataRecord = { >>> new RDRawDataRecord() >>> } >>> } >>> >>> 3. code of RDRawDataRecord.scala >>> >>> import com.kiisoo.aegis.common.rawdata.RawDataRecord; >>> import java.io.DataInput; >>> import java.io.DataOutput; >>> import java.io.IOException; >>> import org.apache.commons.lang.StringUtils; >>> import org.apache.hadoop.io.Writable; >>> >>> public class RDRawDataRecord implements Writable { >>> private String smac; >>> private String dmac; >>> private int hrssi; >>> private int lrssi; >>> private long fstamp; >>> private long lstamp; >>> private long maxstamp; >>> private long minstamp; >>> private long stamp; >>> >>> public void readFields(DataInput in) throws IOException { >>> this.smac = in.readUTF(); >>> this.dmac = in.readUTF(); >>> this.hrssi = in.readInt(); >>> this.lrssi = in.readInt(); >>> this.fstamp = in.readLong(); >>> this.lstamp = in.readLong(); >>> this.maxstamp = in.readLong(); >>> this.minstamp = in.readLong(); >>> this.stamp = in.readLong(); >>> } >>> >>> public void write(DataOutput out) throws IOException { >>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:""); >>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:""); >>> out.writeInt(this.hrssi); >>> out.writeInt(this.lrssi); >>> out.writeLong(this.fstamp); >>> out.writeLong(this.lstamp); >>> out.writeLong(this.maxstamp); >>> out.writeLong(this.minstamp); >>> out.writeLong(this.stamp); >>> } >>> >>> /** >>> >>> ignore getter setter >>> >>> **/ >>> >>> } >>> >>> At last, I use this code to run: >>> >>> val filePath = >>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044" >>> val conf = new SparkConf() >>> conf.setMaster("local") >>> conf.setAppName("demo") >>> val sc = new SparkContext(conf) >>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >>> file.foreach(v => { >>> println(v._2.getDmac) // Attribute of custom objects >>> }) >>> >>> I get an error, it says: >>> >>> Error:(41, 19) type arguments >>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]] >>> conform to the bounds of none of the overloaded alternatives of >>> value hadoopFile: [K, V, F <: >>> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: >>> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit >>> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, >>> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: >>> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: >>> scala.reflect.ClassTag[V], implicit fm: >>> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] >>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >>> ^ >>> >>> >>> I also try read the text file with SparkContext AIP >>> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', It >>> works. >>> This error is what does this mean? How to fix this error? >>> >>> Thank you for help me. >>> >>> -- >>> Tony >>> :) >> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org