You're getting InternalRow instances. They probably have the data you want, but the toString representation doesn't match the data for InternalRow.
On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <loand...@amazon.com.invalid> wrote: > Hello Friends, > > > > I’m working on a performance improvement that reads additional parquet > files in the middle of a lambda and I’m running into some issues. This is > what id like todo > > > > ds.mapPartitions(x=>{ > //read parquet file in and perform an operation with x > }) > > > > > > Here’s my current POC code but I’m getting nonsense back from the row > reader. > > > > *import *com.amazon.horizon.azulene.util.SparkFileUtils._ > > *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false") > > *val *data = *List*( > *TestRow*(1,1,"asdf"), > *TestRow*(2,1,"asdf"), > *TestRow*(3,1,"asdf"), > *TestRow*(4,1,"asdf") > ) > > *val *df = *spark*.createDataFrame(data) > > *val *folder = Files.*createTempDirectory*("azulene-test") > > *val *folderPath = folder.toAbsolutePath.toString + "/" > df.write.mode("overwrite").parquet(folderPath) > > *val *files = *spark*.fs.listStatus(folder.toUri) > > *val *file = files(1)//skip _success file > > *val *partitionSchema = *StructType*(*Seq*.empty) > *val *dataSchema = df.schema > *val *fileFormat = *new *ParquetFileFormat() > > *val *path = file.getPath > > *val *status = *spark*.fs.getFileStatus(path) > > *val *pFile = *new *PartitionedFile( > partitionValues = InternalRow.*empty*,//This should be empty for non > partitioned values > filePath = path.toString, > start = 0, > length = status.getLen > ) > > *val *readFile: (PartitionedFile) => Iterator[Any] = > //Iterator[InternalRow] > fileFormat.buildReaderWithPartitionValues( > sparkSession = *spark*, > dataSchema = dataSchema, > partitionSchema = partitionSchema,//this should be empty for non > partitioned feilds > requiredSchema = dataSchema, > filters = *Seq*.empty, > options = *Map*.*empty*, > hadoopConf = *spark*.sparkContext.hadoopConfiguration > //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) > ) > > *import *scala.collection.JavaConverters._ > > *val *rows = readFile(pFile).flatMap(_ *match *{ > *case *r: InternalRow => *Seq*(r) > > // This doesn't work. vector mode is doing something screwy > *case *b: ColumnarBatch => b.rowIterator().asScala > }).toList > > *println*(rows) > //List([0,1,5b,2000000004,66647361]) > //??this is wrong I think???? > > > > Has anyone attempted something similar? > > > > Cheers Andrew > > > -- Ryan Blue Software Engineer Netflix