Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ? On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <loand...@amazon.com.invalid> wrote:
> Thanks a ton for the help! > > > > Is there a standardized way of converting the internal row to rows? > > > > I’ve tried this but im getting an exception > > > > *val *enconder = *RowEncoder*(df.schema) > *val *rows = readFile(pFile).flatMap(_ *match *{ > *case *r: InternalRow => *Seq*(r) > *case *b: ColumnarBatch => b.rowIterator().asScala > }) > .map(enconder.fromRow(_)) > .toList > > > > java.lang.RuntimeException: Error while decoding: > java.lang.UnsupportedOperationException: Cannot evaluate expression: > getcolumnbyordinal(0, IntegerType) > > createexternalrow(getcolumnbyordinal(0, IntegerType), > getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2, > StringType).toString, StructField(pk,IntegerType,false), > StructField(ordering,IntegerType,false), StructField(col_a,StringType,true)) > > > > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305) > > at > com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100) > > at > com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100) > > > > *From: *Ryan Blue <rb...@netflix.com.INVALID> > *Reply-To: *"rb...@netflix.com" <rb...@netflix.com> > *Date: *Thursday, March 21, 2019 at 3:32 PM > *To: *"Long, Andrew" <loand...@amazon.com.invalid> > *Cc: *"dev@spark.apache.org" <dev@spark.apache.org>, " > u...@spark.apache.org" <u...@spark.apache.org>, "horizon-...@amazon.com" < > horizon-...@amazon.com> > *Subject: *Re: Manually reading parquet files. > > > > You're getting InternalRow instances. They probably have the data you > want, but the toString representation doesn't match the data for > InternalRow. > > > > On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <loand...@amazon.com.invalid> > wrote: > > Hello Friends, > > > > I’m working on a performance improvement that reads additional parquet > files in the middle of a lambda and I’m running into some issues. This is > what id like todo > > > > ds.mapPartitions(x=>{ > //read parquet file in and perform an operation with x > }) > > > > > > Here’s my current POC code but I’m getting nonsense back from the row > reader. > > > > *import *com.amazon.horizon.azulene.util.SparkFileUtils._ > > *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false") > > *val *data = *List*( > *TestRow*(1,1,"asdf"), > *TestRow*(2,1,"asdf"), > *TestRow*(3,1,"asdf"), > *TestRow*(4,1,"asdf") > ) > > *val *df = *spark*.createDataFrame(data) > > *val *folder = Files.*createTempDirectory*("azulene-test") > > *val *folderPath = folder.toAbsolutePath.toString + "/" > df.write.mode("overwrite").parquet(folderPath) > > *val *files = *spark*.fs.listStatus(folder.toUri) > > *val *file = files(1)//skip _success file > > *val *partitionSchema = *StructType*(*Seq*.empty) > *val *dataSchema = df.schema > *val *fileFormat = *new *ParquetFileFormat() > > *val *path = file.getPath > > *val *status = *spark*.fs.getFileStatus(path) > > *val *pFile = *new *PartitionedFile( > partitionValues = InternalRow.*empty*,//This should be empty for non > partitioned values > filePath = path.toString, > start = 0, > length = status.getLen > ) > > *val *readFile: (PartitionedFile) => Iterator[Any] = > //Iterator[InternalRow] > fileFormat.buildReaderWithPartitionValues( > sparkSession = *spark*, > dataSchema = dataSchema, > partitionSchema = partitionSchema,//this should be empty for non > partitioned feilds > requiredSchema = dataSchema, > filters = *Seq*.empty, > options = *Map*.*empty*, > hadoopConf = *spark*.sparkContext.hadoopConfiguration > //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) > ) > > *import *scala.collection.JavaConverters._ > > *val *rows = readFile(pFile).flatMap(_ *match *{ > *case *r: InternalRow => *Seq*(r) > > // This doesn't work. vector mode is doing something screwy > *case *b: ColumnarBatch => b.rowIterator().asScala > }).toList > > *println*(rows) > //List([0,1,5b,2000000004,66647361]) > //??this is wrong I think???? > > > > Has anyone attempted something similar? > > > > Cheers Andrew > > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix >