Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?

> Thanks a ton for the help!
> Is there a standardized way of converting the internal row to rows?
> I’ve tried this but im getting an exception
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> })
>   .map(enconder.fromRow(_))
>   .toList
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
> Hello Friends,
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
> *import *
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
> *val *df = *spark*.createDataFrame(data)
> *val *folder = Files.*createTempDirectory*("azulene-test")
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
> *val *files = *spark*.fs.listStatus(folder.toUri)
> *val *file = files(1)//skip _success file
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
> *val *path = file.getPath
> *val *status = *spark*.fs.getFileStatus(path)
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
>     sparkSession = *spark*,
>     dataSchema = dataSchema,
>     partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
>     requiredSchema = dataSchema,
>     filters = *Seq*.empty,
>     options = *Map*.*empty*,
>     hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
> *import *scala.collection.JavaConverters._
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
> Has anyone attempted something similar?
> Cheers Andrew
