Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?
On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <[email protected]>
wrote:
> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
> *case *b: ColumnarBatch => b.rowIterator().asScala
> })
> .map(enconder.fromRow(_))
> .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" <[email protected]>
> *Cc: *"[email protected]" <[email protected]>, "
> [email protected]" <[email protected]>, "[email protected]" <
> [email protected]>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <[email protected]>
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues. This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
> //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
> *TestRow*(1,1,"asdf"),
> *TestRow*(2,1,"asdf"),
> *TestRow*(3,1,"asdf"),
> *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
> partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
> filePath = path.toString,
> start = 0,
> length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
> fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
>
> // This doesn't work. vector mode is doing something screwy
> *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>