Thanks a ton for the help! Is there a standardized way of converting the internal row to rows?
I’ve tried this but im getting an exception val enconder = RowEncoder(df.schema) val rows = readFile(pFile).flatMap(_ match { case r: InternalRow => Seq(r) case b: ColumnarBatch => b.rowIterator().asScala }) .map(enconder.fromRow(_)) .toList java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: getcolumnbyordinal(0, IntegerType) createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2, StringType).toString, StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), StructField(col_a,StringType,true)) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305) at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100) at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100) From: Ryan Blue <rb...@netflix.com.INVALID> Reply-To: "rb...@netflix.com" <rb...@netflix.com> Date: Thursday, March 21, 2019 at 3:32 PM To: "Long, Andrew" <loand...@amazon.com.invalid> Cc: "dev@spark.apache.org" <dev@spark.apache.org>, "u...@spark.apache.org" <u...@spark.apache.org>, "horizon-...@amazon.com" <horizon-...@amazon.com> Subject: Re: Manually reading parquet files. You're getting InternalRow instances. They probably have the data you want, but the toString representation doesn't match the data for InternalRow. On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <loand...@amazon.com.invalid> wrote: Hello Friends, I’m working on a performance improvement that reads additional parquet files in the middle of a lambda and I’m running into some issues. This is what id like todo ds.mapPartitions(x=>{ //read parquet file in and perform an operation with x }) Here’s my current POC code but I’m getting nonsense back from the row reader. import com.amazon.horizon.azulene.util.SparkFileUtils._ spark.conf.set("spark.sql.parquet.enableVectorizedReader","false") val data = List( TestRow(1,1,"asdf"), TestRow(2,1,"asdf"), TestRow(3,1,"asdf"), TestRow(4,1,"asdf") ) val df = spark.createDataFrame(data) val folder = Files.createTempDirectory("azulene-test") val folderPath = folder.toAbsolutePath.toString + "/" df.write.mode("overwrite").parquet(folderPath) val files = spark.fs.listStatus(folder.toUri) val file = files(1)//skip _success file val partitionSchema = StructType(Seq.empty) val dataSchema = df.schema val fileFormat = new ParquetFileFormat() val path = file.getPath val status = spark.fs.getFileStatus(path) val pFile = new PartitionedFile( partitionValues = InternalRow.empty,//This should be empty for non partitioned values filePath = path.toString, start = 0, length = status.getLen ) val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow] fileFormat.buildReaderWithPartitionValues( sparkSession = spark, dataSchema = dataSchema, partitionSchema = partitionSchema,//this should be empty for non partitioned feilds requiredSchema = dataSchema, filters = Seq.empty, options = Map.empty, hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) ) import scala.collection.JavaConverters._ val rows = readFile(pFile).flatMap(_ match { case r: InternalRow => Seq(r) // This doesn't work. vector mode is doing something screwy case b: ColumnarBatch => b.rowIterator().asScala }).toList println(rows) //List([0,1,5b,2000000004,66647361]) //??this is wrong I think???? Has anyone attempted something similar? Cheers Andrew -- Ryan Blue Software Engineer Netflix