Thanks a ton for the help!
Is there a standardized way of converting the internal row to rows?
I’ve tried this but im getting an exception
val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
.map(enconder.fromRow(_))
.toList
java.lang.RuntimeException: Error while decoding:
java.lang.UnsupportedOperationException: Cannot evaluate expression:
getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1,
IntegerType), getcolumnbyordinal(2, StringType).toString,
StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false),
StructField(col_a,StringType,true))
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
at
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
From: Ryan Blue <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" <[email protected]>
Cc: "[email protected]" <[email protected]>, "[email protected]"
<[email protected]>, "[email protected]" <[email protected]>
Subject: Re: Manually reading parquet files.
You're getting InternalRow instances. They probably have the data you want, but
the toString representation doesn't match the data for InternalRow.
On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <[email protected]>
wrote:
Hello Friends,
I’m working on a performance improvement that reads additional parquet files in
the middle of a lambda and I’m running into some issues. This is what id like
todo
ds.mapPartitions(x=>{
//read parquet file in and perform an operation with x
})
Here’s my current POC code but I’m getting nonsense back from the row reader.
import com.amazon.horizon.azulene.util.SparkFileUtils._
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
val data = List(
TestRow(1,1,"asdf"),
TestRow(2,1,"asdf"),
TestRow(3,1,"asdf"),
TestRow(4,1,"asdf")
)
val df = spark.createDataFrame(data)
val folder = Files.createTempDirectory("azulene-test")
val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)
val files = spark.fs.listStatus(folder.toUri)
val file = files(1)//skip _success file
val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()
val path = file.getPath
val status = spark.fs.getFileStatus(path)
val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non
partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
)
val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf =
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)
import scala.collection.JavaConverters._
val rows = readFile(pFile).flatMap(_ match {
case r: InternalRow => Seq(r)
// This doesn't work. vector mode is doing something screwy
case b: ColumnarBatch => b.rowIterator().asScala
}).toList
println(rows)
//List([0,1,5b,2000000004,66647361])
//??this is wrong I think????
Has anyone attempted something similar?
Cheers Andrew
--
Ryan Blue
Software Engineer
Netflix