Yes, purely for performance. On Thu, Mar 30, 2023, 3:01 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> Is this purely for performance consideration? > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 30 Mar 2023 at 19:56, praveen sinha <praveen.si...@gmail.com> > wrote: > >> Hi, >> >> I have been trying to implement InMemoryRelation based on spark >> ColumnarBatches, so far I have not been able to store the vectorised >> columnarbatch into the relation. Is there a way to achieve this without >> going with an intermediary representation like Arrow, so as to enable spark >> to do fast columnar aggregations in memory. The code so far, using just the >> high level APIs is as follows - >> >> ``` >> //Load csv into Datafram >> val csvDF: DataFrame = context.sqlctx.read >> .format("com.databricks.spark.csv") >> .option("header", "true") >> .option("inferSchema", "true") >> .load(csvFile) >> >> //Create in memory relation using schema from csv dataframe >> val relation = InMemoryRelation( >> useCompression = true, >> batchSize = 100, >> storageLevel = StorageLevel.MEMORY_ONLY, >> child = csvDF.queryExecution.sparkPlan, //Do I need to alter this >> to suggest columnar plans? >> tableName = Some("nyc_taxi"), >> optimizedPlan = csvDF.queryExecution.optimizedPlan >> ) >> >> //create vectorized columnar batches >> val rows = csvDF.collect() >> import scala.collection.JavaConverters._ >> val vectorizedRows: ColumnarBatch = >> ColumnVectorUtils.toBatch(csvDF.schema, MemoryMode.ON_HEAP, >> rows.iterator.asJava) >> >> //store the vectorized rows in the relation >> //relation.store(vectorizedRows) >> ``` >> >> Obviously the last line is the one which is not an API. Need help to >> understand if this approach can work and if it does, need help and pointers >> in trying to come up with how to implement this API using low level spark >> constructs. >> >> Thanks and Regards, >> Praveen >> >