Hello Friends,
I’m working on a performance improvement that reads additional parquet files in
the middle of a lambda and I’m running into some issues. This is what id like
todo
ds.mapPartitions(x=>{
//read parquet file in and perform an operation with x
})
Here’s my current POC code but I’m getting nonsense back from the row reader.
import com.amazon.horizon.azulene.util.SparkFileUtils._
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
val data = List(
TestRow(1,1,"asdf"),
TestRow(2,1,"asdf"),
TestRow(3,1,"asdf"),
TestRow(4,1,"asdf")
)
val df = spark.createDataFrame(data)
val folder = Files.createTempDirectory("azulene-test")
val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)
val files = spark.fs.listStatus(folder.toUri)
val file = files(1)//skip _success file
val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()
val path = file.getPath
val status = spark.fs.getFileStatus(path)
val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non
partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
)
val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf =
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)
import scala.collection.JavaConverters._
val rows = readFile(pFile).flatMap(_ match {
case r: InternalRow => Seq(r)
// This doesn't work. vector mode is doing something screwy
case b: ColumnarBatch => b.rowIterator().asScala
}).toList
println(rows)
//List([0,1,5b,2000000004,66647361])
//??this is wrong I think????
Has anyone attempted something similar?
Cheers Andrew